Skip to content

Commit

Permalink
More shuffle fixes (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 29, 2023
1 parent fd7e65b commit 2af0f50
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 77 deletions.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by s

## Status

- Seems to be working, but only tested with a few queries so far!
- Partially working. Can run a few TPC-H queries.

## Features

Expand All @@ -35,9 +35,17 @@ ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by s

## Limitations

- Simplistic shuffle mechanism that produces lots of files
- Requires a shared file system currently

## Performance

This chart shows the relative performance of RaySQL with other open-source distributed SQL frameworks.

Only a few queries work, and performance does not look very promising so far, but this may just be because of the naïve
distributed planner introducing unnecessary shuffles.

~[SQLBench-H Performance Chart](https://sqlbenchmarks.io/sqlbench-h/results/env/workstation/sf10/distributed/sqlbench-h-workstation-10-distributed-perquery.png)

## Building

```bash
Expand All @@ -59,6 +67,15 @@ maturin develop
python -m pytest
```

## Benchmarking

Create a release build when running benchmarks, then use pip to install the wheel.

```bash
maturin build --release
pip install ./target/wheels/raysql-0.1.0-cp37-abi3-manylinux_2_31_x86_64.whl --force-reinstall
```

## How to update dependencies

To change test dependencies, change the `requirements.in` and run
Expand Down
9 changes: 7 additions & 2 deletions raysql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class RaySqlContext:

def __init__(self, workers):
self.ctx = Context()
self.ctx = Context(len(workers))
self.workers = workers

def register_csv(self, table_name, path, has_header):
Expand All @@ -29,7 +29,12 @@ def execute_query_stage(self, graph, stage):
child_stage = graph.get_query_stage(child_id)
self.execute_query_stage(graph, child_stage)

partition_count = stage.get_input_partition_count()
# todo what is correct logic here?
if stage.get_output_partition_count == 1:
partition_count = 1
else:
partition_count = stage.get_input_partition_count()

print("Scheduling query stage #{} with {} input partitions and {} output partitions".format(stage.id(), partition_count, stage.get_output_partition_count()))

# serialize the plan
Expand Down
2 changes: 1 addition & 1 deletion raysql/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
@ray.remote
class Worker:
def __init__(self):
self.ctx = Context()
self.ctx = Context(1)

def execute_query_partition(self, plan_bytes, part):
plan = self.ctx.deserialize_execution_plan(plan_bytes)
Expand Down
35 changes: 23 additions & 12 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::planner::{make_execution_graph, PyExecutionGraph};
use crate::shuffle::ShuffleCodec;
use crate::utils::wait_for_future;
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::array::UInt32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
Expand Down Expand Up @@ -29,8 +29,8 @@ pub struct PyContext {
#[pymethods]
impl PyContext {
#[new]
pub fn new() -> Self {
let config = SessionConfig::default().with_target_partitions(4);
pub fn new(target_partitions: usize) -> Self {
let config = SessionConfig::default().with_target_partitions(target_partitions);
Self {
ctx: SessionContext::with_config(config),
}
Expand Down Expand Up @@ -63,7 +63,7 @@ impl PyContext {

// debug logging
for stage in graph.query_stages.values() {
debug!(
println!(
"Query stage #{}:\n{}",
stage.id,
displayable(stage.plan.as_ref()).indent()
Expand Down Expand Up @@ -102,21 +102,32 @@ impl PyContext {
let fut = rt.spawn(async move {
let mut stream = plan.plan.execute(part, ctx)?;
let mut results = vec![];
let mut row_count = 0_u32;
while let Some(result) = stream.next().await {
let input_batch = result?;
row_count += 1;
results.push(input_batch);
}

println!("Results:\n{}", pretty_format_batches(&results)?);

// TODO remove this dummy batch
// create a dummy batch to return - later this could be metadata about the
// shuffle partitions that were written out
let schema = Arc::new(Schema::new(vec![Field::new("foo", DataType::Int32, true)]));
let array = Int32Array::from(vec![42]);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?;

// return as a stream
// return a result set with metadata about this executed partition
let schema = Arc::new(Schema::new(vec![
Field::new("partition_index", DataType::UInt32, true),
Field::new("partition_batches", DataType::UInt32, true),
Field::new("partition_rows", DataType::UInt32, true),
]));
let part_index = UInt32Array::from(vec![part as u32]);
let part_batches = UInt32Array::from(vec![results.len() as u32]);
let part_rows = UInt32Array::from(vec![row_count]);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(part_index),
Arc::new(part_batches),
Arc::new(part_rows),
],
)?;
MemoryStream::try_new(vec![batch], schema, None)
});

Expand Down
52 changes: 35 additions & 17 deletions src/planner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec};
use datafusion::error::Result;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
Expand Down Expand Up @@ -50,6 +51,12 @@ pub struct ExecutionGraph {
id_generator: AtomicUsize,
}

impl Default for ExecutionGraph {
fn default() -> Self {
Self::new()
}
}

impl ExecutionGraph {
pub fn new() -> Self {
Self {
Expand All @@ -68,7 +75,7 @@ impl ExecutionGraph {
// the final query stage is always the last to be created and
// therefore has the highest id
let mut max_id = 0;
for (k, _) in &self.query_stages {
for k in self.query_stages.keys() {
if *k > max_id {
max_id = *k;
}
Expand Down Expand Up @@ -166,6 +173,7 @@ pub fn make_execution_graph(plan: Arc<dyn ExecutionPlan>) -> Result<ExecutionGra
Ok(graph)
}

/// Replace RepartitionExec and CoalescePartitionExec with ShuffleWriterExec/ShuffleReaderExec
fn generate_query_stages(
plan: Arc<dyn ExecutionPlan>,
graph: &mut ExecutionGraph,
Expand All @@ -180,38 +188,48 @@ fn generate_query_stages(

if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
match repart.partitioning() {
&Partitioning::RoundRobinBatch(_) => {
// DataFusion adds round-robin partitioning to increase parallelism
// but that doesn't make so much sense for distributed because it
// introduces unnecessary shuffle overhead
Ok(plan.children()[0].clone())
&Partitioning::UnknownPartitioning(_) => {
// just remove this
Ok(repart.children()[0].clone())
}
&Partitioning::Hash(ref part_expr, part_count) => {
partitioning_scheme => {
// create a shuffle query stage for this repartition
let stage_id = graph.next_id();
// we remove the RepartitionExec because it isn't designed for our
// distributed use case and the ShuffleWriterExec contains the repartitioning
// logic
let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer = ShuffleWriterExec::new(
stage_id,
shuffle_writer_input,
part_expr.to_vec(),
part_count,
partitioning_scheme.clone(),
);
let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(
stage_id,
plan.schema(),
repart.partitioning().clone(),
partitioning_scheme.clone(),
)))
}
&Partitioning::UnknownPartitioning(_) => {
// remove UnknownPartitioning repartitions
Ok(plan.children()[0].clone())
}
}
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
// introduce shuffle to produce one output partition
let stage_id = graph.next_id();
let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer = ShuffleWriterExec::new(
stage_id,
shuffle_writer_input,
Partitioning::UnknownPartitioning(1),
);
let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(
stage_id,
plan.schema(),
Partitioning::UnknownPartitioning(1),
)))
} else {
Ok(plan)
}
Expand Down
8 changes: 2 additions & 6 deletions src/proto/generated/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ pub struct ShuffleReaderExecNode {
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
/// this must match the output partitioning of the writer we are reading from
#[prost(message, optional, tag = "3")]
pub partitioning: ::core::option::Option<
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
pub partitioning: ::core::option::Option<::datafusion_proto::protobuf::PhysicalHashRepartition>,
/// directory for shuffle files
#[prost(string, tag = "4")]
pub shuffle_dir: ::prost::alloc::string::String,
Expand All @@ -44,9 +42,7 @@ pub struct ShuffleWriterExecNode {
pub plan: ::core::option::Option<::datafusion_proto::protobuf::PhysicalPlanNode>,
/// output partitioning schema
#[prost(message, optional, tag = "3")]
pub partitioning: ::core::option::Option<
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
pub partitioning: ::core::option::Option<::datafusion_proto::protobuf::PhysicalHashRepartition>,
/// directory for shuffle files
#[prost(string, tag = "4")]
pub shuffle_dir: ::prost::alloc::string::String,
Expand Down
26 changes: 14 additions & 12 deletions src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ impl PhysicalExtensionCodec for ShuffleCodec {
registry,
plan.schema().as_ref(),
)?;
match hash_part {
Some(Partitioning::Hash(expr, count)) => Ok(Arc::new(ShuffleWriterExec::new(
writer.stage_id as usize,
plan,
expr,
count as usize,
))),
_ => todo!(),
}
Ok(Arc::new(ShuffleWriterExec::new(
writer.stage_id as usize,
plan,
hash_part.unwrap(),
)))
}
_ => unreachable!(),
}
Expand Down Expand Up @@ -111,7 +107,13 @@ fn encode_partitioning_scheme(partitioning: &Partitioning) -> Result<PhysicalHas
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}),
_ => todo!("unsupported shuffle partitioning scheme"),
Partitioning::UnknownPartitioning(1) => Ok(protobuf::PhysicalHashRepartition {
hash_expr: vec![],
partition_count: 1,
}),
other => Err(DataFusionError::Plan(format!(
"Unsupported shuffle partitioning scheme: {other:?}"
))),
}
}

Expand All @@ -123,10 +125,10 @@ impl FunctionRegistry for RaySqlFunctionRegistry {
}

fn udf(&self, name: &str) -> datafusion::common::Result<Arc<ScalarUDF>> {
Err(DataFusionError::Plan(format!("Invalid UDF: {}", name)))
Err(DataFusionError::Plan(format!("Invalid UDF: {name}")))
}

fn udaf(&self, name: &str) -> datafusion::common::Result<Arc<AggregateUDF>> {
Err(DataFusionError::Plan(format!("Invalid UDAF: {}", name)))
Err(DataFusionError::Plan(format!("Invalid UDAF: {name}")))
}
}
2 changes: 1 addition & 1 deletion src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ExecutionPlan for ShuffleReaderExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(self.clone())
Ok(self)
}

fn execute(
Expand Down
Loading

0 comments on commit 2af0f50

Please sign in to comment.