Skip to content

Commit

Permalink
Remove hard-coded temp dir (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 29, 2023
1 parent 5fc4345 commit f8ff000
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ log = "0.4"
prost = "0.11"
prost-types = "0.11"
pyo3 = { version = "~0.17.3", features = ["extension-module", "abi3", "abi3-py37"] }
tempfile = "3"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] }

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion raysql/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def execute_query_partition(self, plan_bytes, part):
plan = self.ctx.deserialize_execution_plan(plan_bytes)

# debug logging
# print("Executing partition #{}:\n{}".format(part, plan.display_indent()))
print("Executing partition #{}:\n{}".format(part, plan.display_indent()))

# This is delegating to DataFusion for execution, but this would be a good place
# to plug in other execution engines by translating the plan into another engine's plan
Expand Down
21 changes: 21 additions & 0 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use pyo3::prelude::*;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::tempdir;

#[pyclass(name = "ExecutionGraph", module = "raysql", subclass)]
pub struct PyExecutionGraph {
Expand Down Expand Up @@ -195,18 +196,24 @@ fn generate_query_stages(
partitioning_scheme => {
// create a shuffle query stage for this repartition
let stage_id = graph.next_id();

// create temp dir for stage shuffle files
let temp_dir = create_temp_dir(stage_id)?;

let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer = ShuffleWriterExec::new(
stage_id,
shuffle_writer_input,
partitioning_scheme.clone(),
&temp_dir,
);
let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(
stage_id,
plan.schema(),
partitioning_scheme.clone(),
&temp_dir,
)))
}
}
Expand All @@ -217,20 +224,34 @@ fn generate_query_stages(
{
// introduce shuffle to produce one output partition
let stage_id = graph.next_id();

// create temp dir for stage shuffle files
let temp_dir = create_temp_dir(stage_id)?;

let shuffle_writer_input = plan.children()[0].clone();
let shuffle_writer = ShuffleWriterExec::new(
stage_id,
shuffle_writer_input,
Partitioning::UnknownPartitioning(1),
&temp_dir,
);
let stage_id = graph.add_query_stage(stage_id, Arc::new(shuffle_writer));
// replace the plan with a shuffle reader
Ok(Arc::new(ShuffleReaderExec::new(
stage_id,
plan.schema(),
Partitioning::UnknownPartitioning(1),
&temp_dir,
)))
} else {
Ok(plan)
}
}

fn create_temp_dir(stage_id: usize) -> Result<String> {
let dir = tempdir()?;
let temp_dir = dir.path().join(format!("stage-{}", stage_id));
let temp_dir = format!("{}", temp_dir.display());
println!("Creating temp shuffle dir: {}", temp_dir);
Ok(temp_dir)
}
6 changes: 4 additions & 2 deletions src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
reader.stage_id as usize,
schema,
hash_part.unwrap(),
&reader.shuffle_dir,
)))
}
Some(PlanType::ShuffleWriter(writer)) => {
Expand All @@ -59,6 +60,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
writer.stage_id as usize,
plan,
hash_part.unwrap(),
&writer.shuffle_dir,
)))
}
_ => unreachable!(),
Expand All @@ -77,7 +79,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
stage_id: reader.stage_id as u32,
schema: Some(schema),
partitioning: Some(partitioning),
shuffle_dir: "/tmp/raysql".to_string(), // TODO remove hard-coded path
shuffle_dir: reader.shuffle_dir.clone(),
};
PlanType::ShuffleReader(reader)
} else if let Some(writer) = node.as_any().downcast_ref::<ShuffleWriterExec>() {
Expand All @@ -87,7 +89,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
stage_id: writer.stage_id as u32,
plan: Some(plan),
partitioning: Some(partitioning),
shuffle_dir: "/tmp/raysql".to_string(), // TODO remove hard-coded path
shuffle_dir: writer.shuffle_dir.clone(),
};
PlanType::ShuffleWriter(writer)
} else {
Expand Down
32 changes: 27 additions & 5 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Statistics;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::union::CombinedRecordBatchStream;
Expand All @@ -26,14 +27,22 @@ pub struct ShuffleReaderExec {
schema: SchemaRef,
/// Output partitioning
partitioning: Partitioning,
/// Directory to read shuffle files from
pub shuffle_dir: String,
}

impl ShuffleReaderExec {
pub fn new(stage_id: usize, schema: SchemaRef, partitioning: Partitioning) -> Self {
pub fn new(
stage_id: usize,
schema: SchemaRef,
partitioning: Partitioning,
shuffle_dir: &str,
) -> Self {
Self {
stage_id,
schema,
partitioning,
shuffle_dir: shuffle_dir.to_string(),
}
}
}
Expand Down Expand Up @@ -72,14 +81,22 @@ impl ExecutionPlan for ShuffleReaderExec {
partition: usize,
_context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
// TODO remove hard-coded path
let pattern = format!("/tmp/raysql/shuffle_{}_*_{partition}.arrow", self.stage_id);
let pattern = format!(
"/{}/shuffle_{}_*_{partition}.arrow",
self.shuffle_dir, self.stage_id
);
let mut streams: Vec<SendableRecordBatchStream> = vec![];
for entry in glob(&pattern).expect("Failed to read glob pattern") {
let file = entry.unwrap();
debug!("Shuffle reader reading from {}", file.display());
let reader = FileReader::try_new(File::open(&file)?, None)?;
streams.push(Box::pin(LocalShuffleStream::new(reader)));
let stream = LocalShuffleStream::new(reader);
if self.schema != stream.schema() {
return Err(DataFusionError::Internal(
"Not all shuffle files have the same schema".to_string(),
));
}
streams.push(Box::pin(stream));
}
Ok(Box::pin(CombinedRecordBatchStream::new(
self.schema.clone(),
Expand All @@ -88,7 +105,12 @@ impl ExecutionPlan for ShuffleReaderExec {
}

fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "ShuffleReaderExec(stage_id={})", self.stage_id)
write!(
f,
"ShuffleReaderExec(stage_id={}, input_partitions={})",
self.stage_id,
self.partitioning.partition_count()
)
}

fn statistics(&self) -> Statistics {
Expand Down
22 changes: 17 additions & 5 deletions src/shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,24 @@ pub struct ShuffleWriterExec {
pub(crate) plan: Arc<dyn ExecutionPlan>,
/// Output partitioning
partitioning: Partitioning,
/// Directory to write shuffle files from
pub shuffle_dir: String,
/// Metrics
pub metrics: ExecutionPlanMetricsSet,
}

impl ShuffleWriterExec {
pub fn new(stage_id: usize, plan: Arc<dyn ExecutionPlan>, partitioning: Partitioning) -> Self {
pub fn new(
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
shuffle_dir: &str,
) -> Self {
Self {
stage_id,
plan,
partitioning,
shuffle_dir: shuffle_dir.to_string(),
metrics: ExecutionPlanMetricsSet::new(),
}
}
Expand Down Expand Up @@ -99,12 +108,15 @@ impl ExecutionPlan for ShuffleWriterExec {
let stage_id = self.stage_id;
let partitioning = self.output_partitioning();
let partition_count = partitioning.partition_count();
let shuffle_dir = self.shuffle_dir.clone();

let results = async move {
if partition_count == 1 {
// stream the results from the query
// TODO remove hard-coded path
let file = format!("/tmp/raysql/shuffle_{stage_id}_{input_partition}_0.arrow");
let file = format!(
"/{}/shuffle_{stage_id}_{input_partition}_0.arrow",
shuffle_dir
);
debug!("Executing query and writing results to {file}");
let stats = write_stream_to_disk(&mut stream, &file, &write_time).await?;
debug!(
Expand Down Expand Up @@ -140,9 +152,9 @@ impl ExecutionPlan for ShuffleWriterExec {
w.write(&output_batch)?;
}
None => {
// TODO remove hard-coded path
let path = format!(
"/tmp/raysql/shuffle_{stage_id}_{input_partition}_{output_partition}.arrow"
"/{}/shuffle_{stage_id}_{input_partition}_{output_partition}.arrow",
shuffle_dir
);
let path = Path::new(&path);
debug!("Writing results to {:?}", path);
Expand Down

0 comments on commit f8ff000

Please sign in to comment.