Skip to content

Commit

Permalink
Update DataFusion version to 28.0.0 (#41)
Browse files Browse the repository at this point in the history
* Update DataFusion version

* update example
  • Loading branch information
andygrove committed Nov 19, 2023
1 parent b473f53 commit 5ee6137
Show file tree
Hide file tree
Showing 12 changed files with 926 additions and 772 deletions.
1,490 changes: 776 additions & 714 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ rust-version = "1.62"
build = "build.rs"

[dependencies]
datafusion = { version = "20.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "20.0.0"
datafusion-python = "20.0.0"
datafusion = { version = "28.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "28.0.0"
datafusion-python = "28.0.0"
futures = "0.3"
glob = "0.3"
log = "0.4"
prost = "0.11"
prost-types = "0.11"
pyo3 = { version = "0.18.1", features = ["extension-module", "abi3", "abi3-py37"] }
pyo3 = { version = "0.19", features = ["extension-module", "abi3", "abi3-py37"] }
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] }
uuid = "1.2"

Expand Down
29 changes: 16 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,28 @@ This is a research project to evaluate performing distributed SQL queries from P
Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing).

```python
import os
import pandas as pd
import ray
from raysql.context import RaySqlContext
from raysql.worker import Worker

# Start our cluster
ray.init()
from raysql import RaySqlContext

# create some remote Workers
workers = [Worker.remote() for i in range(2)]
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))

# create a remote context and register a table
ctx = RaySqlContext.remote(workers)
ray.get(ctx.register_csv.remote('tips', 'tips.csv', True))
# Start a local cluster
ray.init(resources={"worker": 1})

# Parquet is also supported
# ctx.register_parquet('tips', 'tips.parquet')
# Create a context and register a table
ctx = RaySqlContext(2, use_ray_shuffle=True)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")

result_set = ray.get(ctx.sql.remote('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker'))
print(result_set)
result_set = ctx.sql(
"select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker"
)
for record_batch in result_set:
print(record_batch.to_pandas())
```

## Status
Expand Down
8 changes: 4 additions & 4 deletions examples/tips.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os

import pandas as pd
import ray

from raysql import RaySqlContext, ResultSet
from raysql import RaySqlContext

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))

Expand All @@ -18,5 +18,5 @@
result_set = ctx.sql(
"select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker"
)
print("Result:")
print(ResultSet(result_set))
for record_batch in result_set:
print(record_batch.to_pandas())
5 changes: 3 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::planner::{make_execution_graph, PyExecutionGraph};
use crate::shuffle::{RayShuffleReaderExec, ShuffleCodec};
use crate::utils::wait_for_future;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::FromPyArrow;
use datafusion::arrow::pyarrow::ToPyArrow;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::config::Extensions;
use datafusion::error::{DataFusionError, Result};
Expand Down Expand Up @@ -106,7 +107,7 @@ impl PyContext {
println!(
"Query stage #{}:\n{}",
stage.id,
displayable(stage.plan.as_ref()).indent()
displayable(stage.plan.as_ref()).indent(false)
);
}

Expand Down
10 changes: 5 additions & 5 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn generate_query_stages(
.iter()
.map(|x| generate_query_stages(x.clone(), graph, use_ray_shuffle))
.collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, new_children)?;
let plan = with_new_children_if_necessary(plan, new_children)?.into();

debug!("plan = {}", displayable(plan.as_ref()).one_line());
debug!("output_part = {:?}", plan.output_partitioning());
Expand All @@ -154,7 +154,7 @@ fn generate_query_stages(
let partitioning_scheme = coalesce_input.output_partitioning();
let new_input =
create_shuffle_exchange(coalesce_input, graph, partitioning_scheme, use_ray_shuffle)?;
with_new_children_if_necessary(plan, vec![new_input])
with_new_children_if_necessary(plan, vec![new_input]).map(|p| p.into())
} else if plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>()
Expand All @@ -168,7 +168,7 @@ fn generate_query_stages(
partitioning_scheme,
use_ray_shuffle,
)?;
with_new_children_if_necessary(plan, vec![new_input])
with_new_children_if_necessary(plan, vec![new_input]).map(|p| p.into())
} else {
Ok(plan)
}?;
Expand Down Expand Up @@ -397,7 +397,7 @@ mod test {
let plan = df.create_physical_plan().await?;
output.push_str(&format!(
"DataFusion Physical Plan\n========================\n\n{}\n",
displayable(plan.as_ref()).indent()
displayable(plan.as_ref()).indent(false)
));

output.push_str("RaySQL Plan\n===========\n\n");
Expand All @@ -408,7 +408,7 @@ mod test {
"Query Stage #{id} ({} -> {}):\n{}\n",
query_stage.get_input_partition_count(),
query_stage.get_output_partition_count(),
displayable(query_stage.plan.as_ref()).indent()
displayable(query_stage.plan.as_ref()).indent(false)
));
}
let expected_file = format!("testdata/expected-plans/q{n}.txt");
Expand Down
6 changes: 5 additions & 1 deletion src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::physical_plan::AsExecutionPlan;
Expand Down Expand Up @@ -187,4 +187,8 @@ impl FunctionRegistry for RaySqlFunctionRegistry {
fn udaf(&self, name: &str) -> datafusion::common::Result<Arc<AggregateUDF>> {
Err(DataFusionError::Plan(format!("Invalid UDAF: {name}")))
}

fn udwf(&self, name: &str) -> datafusion::common::Result<Arc<WindowUDF>> {
Err(DataFusionError::Plan(format!("Invalid UDAWF: {name}")))
}
}
73 changes: 73 additions & 0 deletions src/shuffle/ray_shuffle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,78 @@
mod reader;
mod writer;

use arrow::record_batch::RecordBatch;
use datafusion::arrow;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::Stream;
pub use reader::RayShuffleReaderExec;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::macros::support::thread_rng_n;
pub use writer::RayShuffleWriterExec;

/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
pub struct CombinedRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
/// Stream entries
entries: Vec<SendableRecordBatchStream>,
}

impl CombinedRecordBatchStream {
/// Create an CombinedRecordBatchStream
pub fn new(schema: SchemaRef, entries: Vec<SendableRecordBatchStream>) -> Self {
Self { schema, entries }
}
}

impl RecordBatchStream for CombinedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for CombinedRecordBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use Poll::*;

let start = thread_rng_n(self.entries.len() as u32) as usize;
let mut idx = start;

for _ in 0..self.entries.len() {
let stream = self.entries.get_mut(idx).unwrap();

match Pin::new(stream).poll_next(cx) {
Ready(Some(val)) => return Ready(Some(val)),
Ready(None) => {
// Remove the entry
self.entries.swap_remove(idx);

// Check if this was the last entry, if so the cursor needs
// to wrap
if idx == self.entries.len() {
idx = 0;
} else if idx < start && start <= self.entries.len() {
// The stream being swapped into the current index has
// already been polled, so skip it.
idx = idx.wrapping_add(1) % self.entries.len();
}
}
Pending => {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
}

// If the map is empty, then the stream is complete.
if self.entries.is_empty() {
Ready(None)
} else {
Pending
}
}
}
15 changes: 9 additions & 6 deletions src/shuffle/ray_shuffle/reader.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::shuffle::ray_shuffle::CombinedRecordBatchStream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Statistics;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::expressions::UnKnownColumn;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::union::CombinedRecordBatchStream;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use futures::Stream;
use std::any::Any;
Expand Down Expand Up @@ -121,17 +122,19 @@ impl ExecutionPlan for RayShuffleReaderExec {
)))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

impl DisplayAs for RayShuffleReaderExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"RayShuffleReaderExec(stage_id={}, input_partitioning={:?})",
self.stage_id, self.partitioning
)
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

struct InMemoryShuffleStream {
Expand Down
19 changes: 11 additions & 8 deletions src/shuffle/ray_shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}
use datafusion::physical_plan::repartition::BatchPartitioner;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
};
use futures::StreamExt;
use futures::TryStreamExt;
Expand Down Expand Up @@ -86,13 +86,6 @@ impl ExecutionPlan for RayShuffleWriterExec {
unimplemented!()
}

fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"RayShuffleWriterExec(stage_id={}, output_partitioning={:?})",
self.stage_id, self.partitioning
)
}
fn execute(
&self,
input_partition: usize,
Expand Down Expand Up @@ -178,6 +171,16 @@ impl ExecutionPlan for RayShuffleWriterExec {
}
}

impl DisplayAs for RayShuffleWriterExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"RayShuffleWriterExec(stage_id={}, output_partitioning={:?})",
self.stage_id, self.partitioning
)
}
}

struct InMemoryWriter {
/// batches buffer
batches: Vec<RecordBatch>,
Expand Down
15 changes: 9 additions & 6 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::shuffle::ray_shuffle::CombinedRecordBatchStream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::record_batch::RecordBatch;
Expand All @@ -6,9 +7,9 @@ use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::expressions::UnKnownColumn;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::union::CombinedRecordBatchStream;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use futures::Stream;
use glob::glob;
Expand Down Expand Up @@ -124,17 +125,19 @@ impl ExecutionPlan for ShuffleReaderExec {
)))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

impl DisplayAs for ShuffleReaderExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"ShuffleReaderExec(stage_id={}, input_partitioning={:?})",
self.stage_id, self.partitioning
)
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

struct LocalShuffleStream {
Expand Down
Loading

0 comments on commit 5ee6137

Please sign in to comment.