Skip to content

Commit

Permalink
Support multiple shuffle partitions (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 29, 2023
1 parent a0b124c commit fd7e65b
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 75 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "5238e8
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "5238e8c97f998b4d2cb9fab85fb182f325a1a7fb" }
datafusion-python = { git = "https://github.com/apache/arrow-datafusion-python", rev = "0a6743c2a7f27015890c5546efda2d4e85d45a97" }
futures = "0.3"
glob = "0.3"
log = "0.4"
prost = "0.11"
prost-types = "0.11"
pyo3 = { version = "~0.17.3", features = ["extension-module", "abi3", "abi3-py37"] }
Expand Down
8 changes: 8 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Notes

## Shuffle

- Each query stage has input and output partition count
- There is one task per input stage
- Each task can produce multiple output partitions
- Example: 4 input partitions, 4 output partitions (with different partitioning scheme) results in 16 shuffle files (4 x 4)
4 changes: 3 additions & 1 deletion raysql/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ def __init__(self):

def execute_query_partition(self, plan_bytes, part):
plan = self.ctx.deserialize_execution_plan(plan_bytes)
print("Executing partition #{}:\n{}".format(part, plan.display_indent()))

# debug logging
# print("Executing partition #{}:\n{}".format(part, plan.display_indent()))

# This is delegating to DataFusion for execution, but this would be a good place
# to plug in other execution engines by translating the plan into another engine's plan
Expand Down
5 changes: 3 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use datafusion_proto::bytes::{
};
use datafusion_python::physical_plan::PyExecutionPlan;
use futures::StreamExt;
use log::debug;
use pyo3::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -54,15 +55,15 @@ impl PyContext {
}

pub fn plan(&self, sql: &str, py: Python) -> PyResult<PyExecutionGraph> {
println!("Planning {}", sql);
debug!("Planning {}", sql);
let df = wait_for_future(py, self.ctx.sql(sql))?;
let plan = wait_for_future(py, df.create_physical_plan())?;

let graph = make_execution_graph(plan.clone())?;

// debug logging
for stage in graph.query_stages.values() {
println!(
debug!(
"Query stage #{}:\n{}",
stage.id,
displayable(stage.plan.as_ref()).indent()
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extern crate core;

use pyo3::prelude::*;

mod proto;
Expand Down
20 changes: 16 additions & 4 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,31 @@ fn generate_query_stages(
// introduces unnecessary shuffle overhead
Ok(plan.children()[0].clone())
}
&Partitioning::Hash(_, _) => {
&Partitioning::Hash(ref part_expr, part_count) => {
// create a shuffle query stage for this repartition
let stage_id = graph.next_id();
// we remove the RepartitionExec because it isn't designed for our
// distributed use case and the ShuffleWriterExec contains the repartitioning
// logic
let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer = ShuffleWriterExec::new(stage_id, shuffle_writer_input);
let shuffle_writer = ShuffleWriterExec::new(
stage_id,
shuffle_writer_input,
part_expr.to_vec(),
part_count,
);
let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(stage_id, plan.schema())))
Ok(Arc::new(ShuffleReaderExec::new(
stage_id,
plan.schema(),
repart.partitioning().clone(),
)))
}
&Partitioning::UnknownPartitioning(_) => {
// remove UnknownPartitioning repartitions
Ok(plan.children()[0].clone())
}
&Partitioning::UnknownPartitioning(_) => todo!(),
}
} else {
Ok(plan)
Expand Down
28 changes: 12 additions & 16 deletions src/proto/generated/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ pub struct ShuffleReaderExecNode {
/// stage to read from
#[prost(uint32, tag = "1")]
pub stage_id: u32,
/// partition to read - this is the *output* partition of the shuffle stage
#[prost(uint32, tag = "2")]
pub partition: u32,
/// schema of the shuffle stage
#[prost(message, optional, tag = "3")]
#[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
/// this must match the output partitions of the writer we are reading from
#[prost(uint32, tag = "4")]
pub num_output_partitions: u32,
/// this must match the output partitioning of the writer we are reading from
#[prost(message, optional, tag = "3")]
pub partitioning: ::core::option::Option<
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
/// directory for shuffle files
#[prost(string, tag = "5")]
#[prost(string, tag = "4")]
pub shuffle_dir: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -43,15 +42,12 @@ pub struct ShuffleWriterExecNode {
/// plan to execute
#[prost(message, optional, tag = "2")]
pub plan: ::core::option::Option<::datafusion_proto::protobuf::PhysicalPlanNode>,
/// how to partition - can be empty
#[prost(message, repeated, tag = "3")]
pub partition_expr: ::prost::alloc::vec::Vec<
::datafusion_proto::protobuf::PhysicalExprNode,
/// output partitioning schema
#[prost(message, optional, tag = "3")]
pub partitioning: ::core::option::Option<
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
/// number of output partitions
#[prost(uint32, tag = "4")]
pub num_output_partitions: u32,
/// directory for shuffle files
#[prost(string, tag = "5")]
#[prost(string, tag = "4")]
pub shuffle_dir: ::prost::alloc::string::String,
}
18 changes: 7 additions & 11 deletions src/proto/raysql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@ message RaySqlExecNode {
message ShuffleReaderExecNode {
// stage to read from
uint32 stage_id = 1;
// partition to read - this is the *output* partition of the shuffle stage
uint32 partition = 2;
// schema of the shuffle stage
datafusion.Schema schema = 3;
// this must match the output partitions of the writer we are reading from
uint32 num_output_partitions = 4;
datafusion.Schema schema = 2;
// this must match the output partitioning of the writer we are reading from
datafusion.PhysicalHashRepartition partitioning = 3;
// directory for shuffle files
string shuffle_dir = 5;
string shuffle_dir = 4;
}

message ShuffleWriterExecNode {
// stage that is writing the shuffle files
uint32 stage_id = 1;
// plan to execute
datafusion.PhysicalPlanNode plan = 2;
// how to partition - can be empty
repeated datafusion.PhysicalExprNode partition_expr = 3;
// number of output partitions
uint32 num_output_partitions = 4;
// output partitioning schema
datafusion.PhysicalHashRepartition partitioning = 3;
// directory for shuffle files
string shuffle_dir = 5;
string shuffle_dir = 4;
}
67 changes: 48 additions & 19 deletions src/shuffle/codec.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// use datafusion_proto::physical_plan::from_proto::*;
// use datafusion_proto::physical_plan::to_proto::*;
use crate::protobuf::ray_sql_exec_node::PlanType;
use crate::protobuf::{RaySqlExecNode, ShuffleReaderExecNode, ShuffleWriterExecNode};
use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec};
use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf;
use datafusion_proto::protobuf::PhysicalPlanNode;
use datafusion_proto::protobuf::{PhysicalHashRepartition, PhysicalPlanNode};
use prost::Message;
use std::collections::HashSet;
use std::sync::Arc;
Expand All @@ -24,30 +24,46 @@ impl PhysicalExtensionCodec for ShuffleCodec {
&self,
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
// decode bytes to protobuf struct
let node = RaySqlExecNode::decode(buf)
.map_err(|e| DataFusionError::Internal(format!("failed to decode plan: {e:?}")))?;
match node.plan_type {
Some(PlanType::ShuffleReader(reader)) => {
let schema = reader.schema.as_ref().unwrap();
let schema: SchemaRef = Arc::new(schema.try_into().unwrap());
let hash_part = parse_protobuf_hash_partitioning(
reader.partitioning.as_ref(),
registry,
&schema,
)?;
Ok(Arc::new(ShuffleReaderExec::new(
0, //TODO
Arc::new(schema.try_into().unwrap()),
reader.stage_id as usize,
schema,
hash_part.unwrap(),
)))
}
Some(PlanType::ShuffleWriter(writer)) => {
let function_registry = RaySqlFunctionRegistry {};
let plan = writer.plan.unwrap().try_into_physical_plan(
&function_registry,
registry,
&RuntimeEnv::default(),
self,
)?;
Ok(Arc::new(ShuffleWriterExec::new(
writer.stage_id as usize,
plan,
)))
let hash_part = parse_protobuf_hash_partitioning(
writer.partitioning.as_ref(),
registry,
plan.schema().as_ref(),
)?;
match hash_part {
Some(Partitioning::Hash(expr, count)) => Ok(Arc::new(ShuffleWriterExec::new(
writer.stage_id as usize,
plan,
expr,
count as usize,
))),
_ => todo!(),
}
}
_ => unreachable!(),
}
Expand All @@ -60,21 +76,21 @@ impl PhysicalExtensionCodec for ShuffleCodec {
) -> Result<(), DataFusionError> {
let plan = if let Some(reader) = node.as_any().downcast_ref::<ShuffleReaderExec>() {
let schema: protobuf::Schema = reader.schema().try_into().unwrap();
let partitioning = encode_partitioning_scheme(&reader.output_partitioning())?;
let reader = ShuffleReaderExecNode {
stage_id: reader.stage_id as u32,
partition: 0,
schema: Some(schema),
num_output_partitions: 1,
partitioning: Some(partitioning),
shuffle_dir: "/tmp/raysql".to_string(), // TODO remove hard-coded path
};
PlanType::ShuffleReader(reader)
} else if let Some(writer) = node.as_any().downcast_ref::<ShuffleWriterExec>() {
let plan = PhysicalPlanNode::try_from_physical_plan(writer.plan.clone(), self)?;
let partitioning = encode_partitioning_scheme(&writer.output_partitioning())?;
let writer = ShuffleWriterExecNode {
stage_id: 0,
stage_id: writer.stage_id as u32,
plan: Some(plan),
partition_expr: vec![],
num_output_partitions: 1,
partitioning: Some(partitioning),
shuffle_dir: "/tmp/raysql".to_string(), // TODO remove hard-coded path
};
PlanType::ShuffleWriter(writer)
Expand All @@ -86,6 +102,19 @@ impl PhysicalExtensionCodec for ShuffleCodec {
}
}

fn encode_partitioning_scheme(partitioning: &Partitioning) -> Result<PhysicalHashRepartition> {
match partitioning {
Partitioning::Hash(expr, partition_count) => Ok(protobuf::PhysicalHashRepartition {
hash_expr: expr
.iter()
.map(|expr| expr.clone().try_into())
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}),
_ => todo!("unsupported shuffle partitioning scheme"),
}
}

struct RaySqlFunctionRegistry {}

impl FunctionRegistry for RaySqlFunctionRegistry {
Expand Down
32 changes: 25 additions & 7 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Statistics;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::union::CombinedRecordBatchStream;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
use futures::Stream;
use glob::glob;
use log::debug;
use std::any::Any;
use std::fmt::Formatter;
use std::fs::File;
Expand All @@ -21,11 +24,17 @@ pub struct ShuffleReaderExec {
pub stage_id: usize,
/// The output schema of the query stage being read from
schema: SchemaRef,
/// Output partitioning
partitioning: Partitioning,
}

impl ShuffleReaderExec {
pub fn new(stage_id: usize, schema: SchemaRef) -> Self {
Self { stage_id, schema }
pub fn new(stage_id: usize, schema: SchemaRef, partitioning: Partitioning) -> Self {
Self {
stage_id,
schema,
partitioning,
}
}
}

Expand All @@ -39,10 +48,11 @@ impl ExecutionPlan for ShuffleReaderExec {
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
self.partitioning.clone()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// TODO could be implemented in some cases
None
}

Expand All @@ -63,10 +73,18 @@ impl ExecutionPlan for ShuffleReaderExec {
_context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
// TODO remove hard-coded path
let file = format!("/tmp/raysql/stage_{}_part_{partition}.arrow", self.stage_id);
println!("Shuffle reader reading from {file}");
let reader = FileReader::try_new(File::open(&file)?, None)?;
Ok(Box::pin(LocalShuffleStream::new(reader)))
let pattern = format!("/tmp/raysql/shuffle_{}_*_{partition}.arrow", self.stage_id);
let mut streams: Vec<SendableRecordBatchStream> = vec![];
for entry in glob(&pattern).expect("Failed to read glob pattern") {
let file = entry.unwrap();
debug!("Shuffle reader reading from {}", file.display());
let reader = FileReader::try_new(File::open(&file)?, None)?;
streams.push(Box::pin(LocalShuffleStream::new(reader)));
}
Ok(Box::pin(CombinedRecordBatchStream::new(
self.schema.clone(),
streams,
)))
}

fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
Expand Down
Loading

0 comments on commit fd7e65b

Please sign in to comment.