Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PyArrow for zero-copy interaction with the Ray Object Store #36

Merged
merged 8 commits into from
Apr 5, 2023

Conversation

franklsf95
Copy link
Contributor

@franklsf95 franklsf95 commented Mar 31, 2023

Ray Shuffle is currently 2x slower than disk-based shuffle. My theory is that there is too much serde and memcpys going on. There really shouldn't be any because the Arrow in-memory format is supported natively by the Ray object store. This PR addresses that.

In this PR:

  • Removed PyResultSet and PyRecordBatch. Instead, use the native pyarrow.RecordBatch and pyarrow.ResultSet. This makes them picklable so that we don't have to convert them from bytes.
  • I realized schedule_execution really doesn't have to be remote tasks since with Ray shuffle we are scheduling all tasks at the beginning of execution. Hence made it a recursive function call. This also saves serde cost of execution plans.

Using PyArrow, Ray Shuffle is now slightly faster than disk-based shuffle on a single node. (See before/after comparison. Plot titles are wrong; this is on a single node).

before
after

I also tested against SparkSQL on a 4-node cluster and RaySQL is 2.5x faster.

4node

@andygrove andygrove merged commit f985808 into datafusion-contrib:main Apr 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants