Skip to content

Commit

Permalink
Documentation & bug fixes (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Feb 5, 2023
1 parent 1621999 commit 7eb0483
Show file tree
Hide file tree
Showing 36 changed files with 852 additions and 500 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description = "RaySQL: DataFusion on Ray"
homepage = "https://github.com/andygrove/ray-sql"
repository = "https://github.com/andygrove/ray-sql"
authors = ["Andy Grove <andygrove73@gmail.com>"]
version = "0.3.0"
version = "0.4.0"
edition = "2021"
readme = "README.md"
license = "Apache-2.0"
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ This is a personal research project to evaluate performing distributed SQL queri

## Goals

- Demonstrate how easily new systems can be built on top of DataFusion
- Drive requirements for DataFusion's Python bindings
- Create content for an interesting blog post or conference talk
- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md)
to understand how RaySQL works.
- Drive requirements for DataFusion's [Python bindings](https://github.com/apache/arrow-datafusion-python).
- Create content for an interesting blog post or conference talk.

## Non Goals

- Build and support a production system
- Build and support a production system.

## Example

Expand All @@ -28,10 +29,14 @@ ray.init()
# create some remote Workers
workers = [Worker.remote() for i in range(2)]

# create context and plan a query
ctx = RaySqlContext(workers)
ctx.register_csv('tips', 'tips.csv', True)
result_set = ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')
# create a remote context and register a table
ctx = RaySqlContext.remote(workers)
ray.get(ctx.register_csv.remote('tips', 'tips.csv', True))

# Parquet is also supported
# ctx.register_parquet('tips', 'tips.parquet')

result_set = ray.get(ctx.sql.remote('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker'))
print(result_set)
```

Expand All @@ -58,7 +63,7 @@ Note that query 15 is excluded from both results since RaySQL does not support D

### Overall Time

RaySQL is ~30% faster overall for this scale factor and environment.
RaySQL is ~65% faster overall for this scale factor and environment.

![SQLBench-H Total](./docs/sqlbench-h-total.png)

Expand Down
264 changes: 258 additions & 6 deletions docs/README.md

Large diffs are not rendered by default.

Binary file modified docs/sqlbench-h-per-query.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/sqlbench-h-total.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion examples/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@

result_set = ray.get(ctx.sql.remote('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker'))
print(result_set)
# print(ray.get(result_set))
12 changes: 6 additions & 6 deletions raysql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ def execute_query_stage(query_stages, stage_id, workers):

# if the query stage has a single output partition then we need to execute for the output
# partition, otherwise we need to execute in parallel for each input partition
if stage.get_output_partition_count == 1:
partition_count = 1
else:
partition_count = stage.get_input_partition_count()
concurrency = stage.get_input_partition_count()
if stage.get_output_partition_count() == 1:
# reduce stage
concurrency = 1

print("Scheduling query stage #{} with {} input partitions and {} output partitions".format(stage.id(), partition_count, stage.get_output_partition_count()))
print("Scheduling query stage #{} with {} input partitions and {} output partitions".format(stage.id(), stage.get_input_partition_count(), stage.get_output_partition_count()))

plan_bytes = ray.put(serialize_execution_plan(stage.get_execution_plan()))

# round-robin allocation across workers
futures = []
for part in range(partition_count):
for part in range(concurrency):
worker_index = part % len(workers)
futures.append(workers[worker_index].execute_query_partition.remote(plan_bytes, part))

Expand Down
18 changes: 12 additions & 6 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::displayable;
use datafusion::prelude::*;
Expand All @@ -28,17 +30,22 @@ pub struct PyContext {
#[pymethods]
impl PyContext {
#[new]
pub fn new(target_partitions: usize) -> Self {
pub fn new(target_partitions: usize) -> Result<Self> {
let config = SessionConfig::default()
.with_target_partitions(target_partitions)
.with_batch_size(16*1024)
.with_batch_size(16 * 1024)
.with_repartition_aggregations(true)
.with_repartition_windows(true)
.with_repartition_joins(true)
.with_parquet_pruning(true);
Self {
ctx: SessionContext::with_config(config),
}

let mem_pool_size = 1024 * 1024 * 1024;
let runtime_config = datafusion::execution::runtime_env::RuntimeConfig::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_pool_size)))
.with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp".into()]));
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let ctx = SessionContext::with_config_rt(config, runtime);
Ok(Self { ctx })
}

pub fn register_csv(
Expand Down Expand Up @@ -78,7 +85,6 @@ impl PyContext {
Ok(PyExecutionGraph::new(graph))
}


/// Execute a partition of a query plan. This will typically be executing a shuffle write and write the results to disk
pub fn execute_partition(&self, plan: PyExecutionPlan, part: usize) -> PyResultSet {
let batches = self
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ extern crate core;
use pyo3::prelude::*;

mod proto;
use crate::context::{deserialize_execution_plan, serialize_execution_plan};
pub use proto::generated::protobuf;
use crate::context::{serialize_execution_plan, deserialize_execution_plan};

pub mod context;
pub mod planner;
pub mod query_stage;
pub mod shuffle;
pub mod utils;
pub mod query_stage;

/// A Python module implemented in Rust.
#[pymodule]
Expand Down
58 changes: 42 additions & 16 deletions src/planner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::query_stage::QueryStage;
use crate::query_stage::PyQueryStage;
use crate::query_stage::QueryStage;
use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec};
use datafusion::error::Result;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::{displayable, Partitioning};
use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use log::debug;
use pyo3::prelude::*;
Expand All @@ -27,13 +27,14 @@ impl PyExecutionGraph {

#[pymethods]
impl PyExecutionGraph {

/// Get a list of stages sorted by id
pub fn get_query_stages(&self) -> Vec<PyQueryStage> {
let mut stages = vec![];
let max_id = self.graph.get_final_query_stage().id;
for id in 0..=max_id {
stages.push(PyQueryStage::from_rust(self.graph.query_stages.get(&id).unwrap().clone()));
stages.push(PyQueryStage::from_rust(
self.graph.query_stages.get(&id).unwrap().clone(),
));
}
stages
}
Expand Down Expand Up @@ -95,8 +96,6 @@ impl ExecutionGraph {
}
}



pub fn make_execution_graph(plan: Arc<dyn ExecutionPlan>) -> Result<ExecutionGraph> {
let mut graph = ExecutionGraph::new();
let root = generate_query_stages(plan, &mut graph)?;
Expand All @@ -118,39 +117,58 @@ fn generate_query_stages(
.collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, new_children)?;

if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
debug!("plan = {}", displayable(plan.as_ref()).one_line());
debug!("output_part = {:?}", plan.output_partitioning());

let new_plan = if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
match repart.partitioning() {
&Partitioning::UnknownPartitioning(_) | &Partitioning::RoundRobinBatch(_) => {
// just remove these
Ok(repart.children()[0].clone())
}
partitioning_scheme => {
create_shuffle_exchange(plan.as_ref(), graph, partitioning_scheme.clone())
}
partitioning_scheme => create_shuffle_exchange(
plan.children()[0].clone(),
graph,
partitioning_scheme.clone(),
),
}
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
create_shuffle_exchange(plan.as_ref(), graph, Partitioning::UnknownPartitioning(1))
let coalesce_input = plan.children()[0].clone();
let partitioning_scheme = coalesce_input.output_partitioning();
let new_input = create_shuffle_exchange(coalesce_input, graph, partitioning_scheme)?;
with_new_children_if_necessary(plan, vec![new_input])
} else if plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>()
.is_some()
{
create_shuffle_exchange(plan.as_ref(), graph, Partitioning::UnknownPartitioning(1))
let partitioned_sort_plan = plan.children()[0].clone();
let partitioning_scheme = partitioned_sort_plan.output_partitioning();
let new_input = create_shuffle_exchange(partitioned_sort_plan, graph, partitioning_scheme)?;
with_new_children_if_necessary(plan, vec![new_input])
} else {
Ok(plan)
}
}?;

debug!("new_plan = {}", displayable(new_plan.as_ref()).one_line());
debug!(
"new_output_part = {:?}\n\n-------------------------\n\n",
new_plan.output_partitioning()
);

Ok(new_plan)
}

/// Create a shuffle exchange.
///
/// The plan is wrapped in a ShuffleWriteExec and added as a new query plan in the execution graph
/// and a ShuffleReaderExec is returned to replace the plan.
fn create_shuffle_exchange(
plan: &dyn ExecutionPlan,
plan: Arc<dyn ExecutionPlan>,
graph: &mut ExecutionGraph,
partitioning_scheme: Partitioning,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -160,13 +178,19 @@ fn create_shuffle_exchange(
// create temp dir for stage shuffle files
let temp_dir = create_temp_dir(stage_id)?;

let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer_input = plan.clone();
let shuffle_writer = ShuffleWriterExec::new(
stage_id,
shuffle_writer_input,
partitioning_scheme.clone(),
&temp_dir,
);

debug!(
"Created shuffle writer with output partitioning {:?}",
shuffle_writer.output_partitioning()
);

let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(
Expand Down Expand Up @@ -345,7 +369,9 @@ mod test {
for id in 0..=graph.get_final_query_stage().id {
let query_stage = graph.query_stages.get(&id).unwrap();
output.push_str(&format!(
"Query Stage #{id}:\n{}\n",
"Query Stage #{id} ({} -> {}):\n{}\n",
query_stage.get_input_partition_count(),
query_stage.get_output_partition_count(),
displayable(query_stage.plan.as_ref()).indent()
));
}
Expand Down
27 changes: 9 additions & 18 deletions src/query_stage.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::shuffle::{ShuffleCodec, ShuffleReaderExec};
use datafusion::error::Result;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec;
use datafusion_python::physical_plan::PyExecutionPlan;
use pyo3::prelude::*;
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use datafusion::error::Result;
use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec;

#[pyclass(name = "QueryStage", module = "raysql", subclass)]
pub struct PyQueryStage {
Expand All @@ -26,10 +26,7 @@ impl PyQueryStage {
let codec = ShuffleCodec {};
let plan = physical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
Ok(PyQueryStage {
stage: Arc::new(QueryStage {
id,
plan
})
stage: Arc::new(QueryStage { id, plan }),
})
}

Expand Down Expand Up @@ -74,7 +71,11 @@ impl QueryStage {
/// Get the input partition count. This is the same as the number of concurrent tasks
/// when we schedule this query stage for execution
pub fn get_input_partition_count(&self) -> usize {
collect_input_partition_count(self.plan.as_ref())
self.plan.children()[0].output_partitioning().partition_count()
}

pub fn get_output_partition_count(&self) -> usize {
self.plan.output_partitioning().partition_count()
}
}

Expand All @@ -87,13 +88,3 @@ fn collect_child_stage_ids(plan: &dyn ExecutionPlan, ids: &mut Vec<usize>) {
}
}
}

fn collect_input_partition_count(plan: &dyn ExecutionPlan) -> usize {
if plan.children().is_empty() {
plan.output_partitioning().partition_count()
} else {
// invariants:
// - all inputs must have the same partition count
collect_input_partition_count(plan.children()[0].as_ref())
}
}
4 changes: 2 additions & 2 deletions src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ fn encode_partitioning_scheme(partitioning: &Partitioning) -> Result<PhysicalHas
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}),
Partitioning::UnknownPartitioning(1) => Ok(protobuf::PhysicalHashRepartition {
Partitioning::UnknownPartitioning(n) => Ok(protobuf::PhysicalHashRepartition {
hash_expr: vec![],
partition_count: 1,
partition_count: *n as u64,
}),
other => Err(DataFusionError::Plan(format!(
"Unsupported shuffle partitioning scheme: {other:?}"
Expand Down
Loading

0 comments on commit 7eb0483

Please sign in to comment.