Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Ray object store for shuffle exchange #28

Merged
merged 21 commits into from
Mar 12, 2023

Conversation

franklsf95
Copy link
Contributor

@franklsf95 franklsf95 commented Mar 6, 2023

  • Add a flag for the Python RaySQL context so specify whether to use Ray for shuffle
  • Make ResultSet serializable across Python workers
  • Pass children ResultSets to RayShuffleReaderExec
  • Make RayShuffleReaderExec read from the input Python objects
  • Make ResultSet picklable
raysql/context.py Show resolved Hide resolved
@@ -67,15 +73,15 @@ impl PyContext {
}

pub fn plan(&self, sql: &str, py: Python) -> PyResult<PyExecutionGraph> {
debug!("Planning {}", sql);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dumb question, but how do I make debug!() to print stuff? I tried export RUST_LOG=debug but still can't see any debug output.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add a dependency on a logger implementation (such as https://docs.rs/env_logger/latest/env_logger/) and initialize the logger somewhere

src/context.rs Show resolved Hide resolved
src/context.rs Outdated Show resolved Hide resolved
@@ -82,6 +84,8 @@ impl QueryStage {
fn collect_child_stage_ids(plan: &dyn ExecutionPlan, ids: &mut Vec<usize>) {
if let Some(shuffle_reader) = plan.as_any().downcast_ref::<ShuffleReaderExec>() {
ids.push(shuffle_reader.stage_id);
} else if let Some(shuffle_reader) = plan.as_any().downcast_ref::<RayShuffleReaderExec>() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to do this in one statement?

Comment on lines 35 to 36
# TODO(@lsf): can there ever be more than 1 child future?
inputs = child_futures[0] if len(child_futures) > 0 else []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there can be multiple inputs for joins, unions, and some other operators

@andygrove andygrove merged commit 78fb1b0 into datafusion-contrib:main Mar 12, 2023
@franklsf95 franklsf95 changed the title [WIP] Use Ray object store for shuffle exchange Mar 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants