Skip to content

Commit

Permalink
Upgrade to DataFusion 20 (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 24, 2023
1 parent 9fcf28e commit 67309d5
Show file tree
Hide file tree
Showing 27 changed files with 1,748 additions and 1,461 deletions.
287 changes: 175 additions & 112 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ description = "RaySQL: DataFusion on Ray"
homepage = "https://github.com/andygrove/ray-sql"
repository = "https://github.com/andygrove/ray-sql"
authors = ["Andy Grove <andygrove73@gmail.com>"]
version = "0.4.0"
version = "0.5.0"
edition = "2021"
readme = "README.md"
license = "Apache-2.0"
rust-version = "1.62"
build = "build.rs"

[dependencies]
datafusion = { version = "17.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "17.0.0"
datafusion-python = { git = "https://github.com/apache/arrow-datafusion-python", rev = "8975d4ec981aeae948250f0cb4d08c131f4f2a28" }
datafusion = { version = "20.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "20.0.0"
datafusion-python = "20.0.0"
futures = "0.3"
glob = "0.3"
log = "0.4"
prost = "0.11"
prost-types = "0.11"
pyo3 = { version = "~0.17.3", features = ["extension-module", "abi3", "abi3-py37"] }
pyo3 = { version = "0.18.1", features = ["extension-module", "abi3", "abi3-py37"] }
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] }
uuid = "1.2"

Expand Down
6 changes: 4 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::config::Extensions;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::execution::disk_manager::DiskManagerConfig;
Expand Down Expand Up @@ -189,14 +190,15 @@ fn _execute_partition(
part: usize,
inputs: PyObject,
) -> Result<Vec<RecordBatch>> {
let ctx = Arc::new(TaskContext::new(
let ctx = Arc::new(TaskContext::try_new(
"task_id".to_string(),
"session_id".to_string(),
HashMap::new(),
HashMap::new(),
HashMap::new(),
Arc::new(RuntimeEnv::default()),
));
Extensions::default()
)?);
Python::with_gil(|py| {
_set_inputs_for_ray_shuffle_reader(plan.plan.clone(), part, &inputs, py)
})?;
Expand Down
4 changes: 2 additions & 2 deletions src/shuffle/ray_shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ impl InMemoryShuffleStream {
}

impl Stream for InMemoryShuffleStream {
type Item = datafusion::arrow::error::Result<RecordBatch>;
type Item = datafusion::error::Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.reader.next() {
return Poll::Ready(Some(batch));
return Poll::Ready(Some(batch.map_err(|e| e.into())));
}
Poll::Ready(None)
}
Expand Down
6 changes: 3 additions & 3 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ShuffleReaderExec {
.collect(),
n,
)
},
}
_ => partitioning,
};

Expand Down Expand Up @@ -148,11 +148,11 @@ impl LocalShuffleStream {
}

impl Stream for LocalShuffleStream {
type Item = datafusion::arrow::error::Result<RecordBatch>;
type Item = datafusion::error::Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.reader.next() {
return Poll::Ready(Some(batch));
return Poll::Ready(Some(batch.map_err(|e| e.into())));
}
Poll::Ready(None)
}
Expand Down
2 changes: 1 addition & 1 deletion src/shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl ShuffleWriterExec {
.collect(),
n,
)
},
}
_ => partitioning,
};

Expand Down
24 changes: 12 additions & 12 deletions testdata/expected-plans/q1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
DataFusion Physical Plan
========================

SortExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
CoalescePartitionsExec
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as avg_disc, COUNT(UInt8(1))@9 as count_order]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
CoalesceBatchesExec: target_batch_size=8192
Expand All @@ -21,7 +21,7 @@ SortExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(35, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(35, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(35, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(35, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(35, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(35, 4))lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: l_shipdate@6 <= 10493
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate <= Date32("10493"), pruning_predicate=l_shipdate_min@0 <= 10493, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate@10 <= 10493, pruning_predicate=l_shipdate_min@0 <= 10493, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]

RaySQL Plan
===========
Expand All @@ -32,17 +32,17 @@ ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "l_return
ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(35, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(35, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(35, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(35, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(35, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(35, 4))lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: l_shipdate@6 <= 10493
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate <= Date32("10493"), pruning_predicate=l_shipdate_min@0 <= 10493, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate@10 <= 10493, pruning_predicate=l_shipdate_min@0 <= 10493, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]

Query Stage #1 (4 -> 4):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 4))
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as avg_disc, COUNT(UInt8(1))@9 as count_order]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 4))
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as avg_disc, COUNT(UInt8(1))@9 as count_order]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 4))

Query Stage #2 (1 -> 1):
SortExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
CoalescePartitionsExec
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 4))
Query Stage #2 (4 -> 1):
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 4))

Loading

0 comments on commit 67309d5

Please sign in to comment.