Skip to content

Commit

Permalink
Bug fix (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 29, 2023
1 parent ab23f22 commit a0b124c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by s

## Status

- Proof-of-concept. Not producing correct results yet.
- Seems to be working, but only tested with a few queries so far!

## Features

Expand Down
Binary file added examples/tips.parquet
Binary file not shown.
7 changes: 5 additions & 2 deletions examples/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
ctx = RaySqlContext(workers)
ctx.register_csv('tips', 'tips.csv', True)

ctx.sql('select day, sum(total_bill) from tips group by day')
# Parquet is also supported
# ctx.register_parquet('tips', 'tips.parquet')

#ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')
ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')

# ctx.sql('select day, sum(total_bill) from tips group by day')
6 changes: 5 additions & 1 deletion src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ fn generate_query_stages(
&Partitioning::Hash(_, _) => {
// create a shuffle query stage for this repartition
let stage_id = graph.next_id();
let shuffle_writer = ShuffleWriterExec::new(stage_id, plan.clone());
// we remove the RepartitionExec because it isn't designed for our
// distributed use case and the ShuffleWriterExec contains the repartitioning
// logic
let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer = ShuffleWriterExec::new(stage_id, shuffle_writer_input);
let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(stage_id, plan.schema())))
Expand Down

0 comments on commit a0b124c

Please sign in to comment.