Skip to content

Commit

Permalink
Remove debug logging (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 31, 2023
1 parent 61e6fcd commit ae07096
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 81 deletions.
56 changes: 28 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description = "RaySQL: DataFusion on Ray"
homepage = "https://github.com/andygrove/ray-sql"
repository = "https://github.com/andygrove/ray-sql"
authors = ["Andy Grove <andygrove73@gmail.com>"]
version = "0.1.0"
version = "0.2.0"
edition = "2021"
readme = "README.md"
license = "Apache-2.0"
Expand All @@ -14,7 +14,7 @@ build = "build.rs"
[dependencies]
datafusion = { version = "17.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "17.0.0"
datafusion-python = { git = "https://github.com/andygrove/arrow-datafusion-python", rev = "7164900a32b25f0644a3294389f02e28e94da8a3" }
datafusion-python = { git = "https://github.com/apache/arrow-datafusion-python", rev = "8975d4ec981aeae948250f0cb4d08c131f4f2a28" }
futures = "0.3"
glob = "0.3"
log = "0.4"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ workers = [Worker.remote() for i in range(2)]
# create context and plan a query
ctx = RaySqlContext(workers)
ctx.register_csv('tips', 'tips.csv', True)
ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')
result_set = ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')
print(result_set)
```

## Status
Expand Down
5 changes: 2 additions & 3 deletions examples/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,5 @@
# Parquet is also supported
# ctx.register_parquet('tips', 'tips.parquet')

ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')

# ctx.sql('select day, sum(total_bill) from tips group by day')
result_set = ctx.sql('select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker')
print(result_set)
12 changes: 8 additions & 4 deletions raysql/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ray
from raysql import Context
import time

class RaySqlContext:

Expand All @@ -18,7 +19,7 @@ def sql(self, sql):
# recurse down the tree and build a DAG of futures
final_stage = graph.get_final_query_stage()
# schedule execution
self.execute_query_stage(graph, final_stage)
return self.execute_query_stage(graph, final_stage)

def execute_query_stage(self, graph, stage):

Expand All @@ -44,9 +45,12 @@ def execute_query_stage(self, graph, stage):
futures = []
for part in range(partition_count):
worker_index = part % len(self.workers)
print("Asking worker {} to execute partition {}".format(worker_index, part))
futures.append(self.workers[worker_index].execute_query_partition.remote(plan_bytes, part))

print("Waiting for query stage #{} to complete".format(stage.id()))
print(ray.get(futures))
print("Query stage #{} has completed".format(stage.id()))
start = time.time()
result_set = ray.get(futures)
end = time.time()
print("Query stage #{} completed in {} seconds".format(stage.id(), end-start))

return result_set
13 changes: 6 additions & 7 deletions raysql/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@
class Worker:
def __init__(self):
self.ctx = Context(1)
self.debug = False

def execute_query_partition(self, plan_bytes, part):
plan = self.ctx.deserialize_execution_plan(plan_bytes)

# debug logging
print("Executing partition #{}:\n{}".format(part, plan.display_indent()))
if self.debug:
print("Executing partition #{}:\n{}".format(part, plan.display_indent()))

# This is delegating to DataFusion for execution, but this would be a good place
# to plug in other execution engines by translating the plan into another engine's plan
# (perhaps via Substrait, once DataFusion supports converting a physical plan to Substrait)
self.ctx.execute_partition(plan, part)

return True


results = self.ctx.execute_partition(plan, part)

# TODO: return results here instead of string representation of results
return "{}".format(results)
File renamed without changes.
93 changes: 59 additions & 34 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use crate::planner::{make_execution_graph, PyExecutionGraph};
use crate::shuffle::ShuffleCodec;
use crate::utils::wait_for_future;
use datafusion::arrow::array::UInt32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::prelude::*;
use datafusion_proto::bytes::{
physical_plan_from_bytes_with_extension_codec, physical_plan_to_bytes_with_extension_codec,
Expand All @@ -20,6 +18,7 @@ use pyo3::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

#[pyclass(name = "Context", module = "raysql", subclass)]
pub struct PyContext {
Expand Down Expand Up @@ -63,7 +62,7 @@ impl PyContext {

// debug logging
for stage in graph.query_stages.values() {
println!(
debug!(
"Query stage #{}:\n{}",
stage.id,
displayable(stage.plan.as_ref()).indent()
Expand All @@ -86,7 +85,21 @@ impl PyContext {
}

/// Execute a partition of a query plan. This will typically be executing a shuffle write and write the results to disk
pub fn execute_partition(&self, plan: PyExecutionPlan, part: usize) -> PyResult<()> {
pub fn execute_partition(&self, plan: PyExecutionPlan, part: usize) -> PyResultSet {
let batches = self
._execute_partition(plan, part)
.unwrap()
.iter()
.map(|batch| PyRecordBatch::new(batch.clone()))
.collect();
PyResultSet::new(batches)
}
}

impl PyContext {
/// Execute a partition of a query plan. This will typically be executing a shuffle write and
/// write the results to disk, except for the final query stage, which will return the data
fn _execute_partition(&self, plan: PyExecutionPlan, part: usize) -> Result<Vec<RecordBatch>> {
let ctx = Arc::new(TaskContext::new(
"task_id".to_string(),
"session_id".to_string(),
Expand All @@ -99,42 +112,54 @@ impl PyContext {
// create a Tokio runtime to run the async code
let rt = Runtime::new().unwrap();

let fut = rt.spawn(async move {
let fut: JoinHandle<Result<Vec<RecordBatch>>> = rt.spawn(async move {
let mut stream = plan.plan.execute(part, ctx)?;
let mut results = vec![];
let mut row_count = 0_u32;
while let Some(result) = stream.next().await {
let input_batch = result?;
row_count += 1;
results.push(input_batch);
results.push(result?);
}

println!("Results:\n{}", pretty_format_batches(&results)?);

// return a result set with metadata about this executed partition
let schema = Arc::new(Schema::new(vec![
Field::new("partition_index", DataType::UInt32, true),
Field::new("partition_batches", DataType::UInt32, true),
Field::new("partition_rows", DataType::UInt32, true),
]));
let part_index = UInt32Array::from(vec![part as u32]);
let part_batches = UInt32Array::from(vec![results.len() as u32]);
let part_rows = UInt32Array::from(vec![row_count]);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(part_index),
Arc::new(part_batches),
Arc::new(part_rows),
],
)?;
MemoryStream::try_new(vec![batch], schema, None)
Ok(results)
});

// block and wait on future
let x = rt.block_on(fut).unwrap(); // TODO error handling
let _stream = x?;
let results = rt.block_on(fut).unwrap()?;
Ok(results)
}
}

Ok(())
#[pyclass(name = "ResultSet", module = "raysql", subclass)]
pub struct PyResultSet {
batches: Vec<PyRecordBatch>,
}

impl PyResultSet {
fn new(batches: Vec<PyRecordBatch>) -> Self {
Self { batches }
}
}

#[pymethods]
impl PyResultSet {
fn __repr__(&self) -> PyResult<String> {
let batches: Vec<RecordBatch> = self.batches.iter().map(|b| b.batch.clone()).collect();
Ok(format!("{}", pretty_format_batches(&batches).unwrap()))
}
}

#[pyclass(name = "RecordBatch", module = "raysql", subclass)]
pub struct PyRecordBatch {
pub(crate) batch: RecordBatch,
}

impl PyRecordBatch {
fn new(batch: RecordBatch) -> Self {
Self { batch }
}

fn __repr__(&self) -> PyResult<String> {
Ok(format!(
"{}",
pretty_format_batches(&[self.batch.clone()]).unwrap()
))
}
}
Loading

0 comments on commit ae07096

Please sign in to comment.