Skip to content

Commit

Permalink
Add support for DDL statements, such as CREATE VIEW (#35)
Browse files Browse the repository at this point in the history
* Experimenting with supporting DDL

* update docs
  • Loading branch information
andygrove committed Mar 27, 2023
1 parent 67309d5 commit ece0d3b
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "raysql"
description = "RaySQL: DataFusion on Ray"
homepage = "https://github.com/andygrove/ray-sql"
repository = "https://github.com/andygrove/ray-sql"
homepage = "https://github.com/datafusion-contrib/ray-sql"
repository = "https://github.com/datafusion-contrib/ray-sql"
authors = ["Andy Grove <andygrove73@gmail.com>"]
version = "0.5.0"
edition = "2021"
Expand Down
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ print(result_set)

## Status

- RaySQL can run 21 of the 22 TPC-H benchmark queries (query 15 needs DDL and that is not yet supported).
- RaySQL can run all queries in the TPC-H benchmark

## Features

Expand All @@ -56,14 +56,12 @@ print(result_set)
## Performance

This chart shows the performance of RaySQL compared to Apache Spark for
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on my desktop (Threadripper
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on a desktop (Threadripper
with 24 physical cores). Both RaySQL and Spark are configured with 24 executors.

Note that query 15 is excluded from both results since RaySQL does not support DDL yet.

### Overall Time

RaySQL is ~65% faster overall for this scale factor and environment.
RaySQL is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.

![SQLBench-H Total](./docs/sqlbench-h-total.png)

Expand Down
Binary file modified docs/sqlbench-h-per-query.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/sqlbench-h-total.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions raysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
execute_partition,
serialize_execution_plan,
deserialize_execution_plan,
empty_result_set
)
from .context import RaySqlContext

Expand Down
6 changes: 6 additions & 0 deletions raysql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ def register_parquet(self, table_name: str, path: str):
self.ctx.register_parquet(table_name, path)

def sql(self, sql: str) -> ResultSet:
# TODO we should parse sql and inspect the plan rather than
# perform a string comparison here
if 'create view' in sql or 'drop view' in sql:
self.ctx.sql(sql)
return raysql.empty_result_set()

graph = self.ctx.plan(sql)
final_stage_id = graph.get_final_query_stage().id()

Expand Down
19 changes: 18 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ impl PyContext {
Ok(())
}

/// Execute SQL directly against the DataFusion context. Useful for statements
/// such as "create view" or "drop view"
pub fn sql(&self, sql: &str, py: Python) -> PyResult<()> {
println!("Executing {}", sql);
let _df = wait_for_future(py, self.ctx.sql(sql))?;
Ok(())
}

/// Plan a distributed SELECT query for executing against the Ray workers
pub fn plan(&self, sql: &str, py: Python) -> PyResult<PyExecutionGraph> {
println!("Planning {}", sql);
let df = wait_for_future(py, self.ctx.sql(sql))?;
Expand Down Expand Up @@ -197,7 +206,7 @@ fn _execute_partition(
HashMap::new(),
HashMap::new(),
Arc::new(RuntimeEnv::default()),
Extensions::default()
Extensions::default(),
)?);
Python::with_gil(|py| {
_set_inputs_for_ray_shuffle_reader(plan.plan.clone(), part, &inputs, py)
Expand Down Expand Up @@ -229,6 +238,14 @@ impl PyResultSet {
fn new(batches: Vec<PyRecordBatch>) -> Self {
Self { batches }
}
fn empty() -> Self {
Self { batches: vec![] }
}
}

#[pyfunction]
pub fn empty_result_set() -> PyResultSet {
PyResultSet::empty()
}

fn _read_pybytes(pyobj: &PyAny, batches: &mut Vec<PyRecordBatch>) -> PyResult<()> {
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ extern crate core;
use pyo3::prelude::*;

mod proto;
use crate::context::{deserialize_execution_plan, execute_partition, serialize_execution_plan};
use crate::context::{
deserialize_execution_plan, empty_result_set, execute_partition, serialize_execution_plan,
};
pub use proto::generated::protobuf;

pub mod context;
Expand All @@ -22,5 +24,6 @@ fn _raysql_internal(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(execute_partition, m)?)?;
m.add_function(wrap_pyfunction!(serialize_execution_plan, m)?)?;
m.add_function(wrap_pyfunction!(deserialize_execution_plan, m)?)?;
m.add_function(wrap_pyfunction!(empty_result_set, m)?)?;
Ok(())
}

0 comments on commit ece0d3b

Please sign in to comment.