Skip to content

Commit

Permalink
upgrade to datafusion v8, use RecordBatchStreamAdapter for sync "exec…
Browse files Browse the repository at this point in the history
…ute" method
  • Loading branch information
jychen7 committed Jun 26, 2022
1 parent 70a0898 commit 667f795
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 188 deletions.
127 changes: 97 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ readme = "README.md"
keywords = [ "arrow", "query", "sql", "datafusion", "bigtable" ]

[dependencies]
datafusion = "7.0.0"
arrow = { version = "9.0.0", features = ["prettyprint"] }
datafusion = "8.0.0"
arrow = { version = "13", features = ["prettyprint"] }
async-trait = "0.1.41"
bigtable_rs = "0.1.5"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
byteorder = "0.5.1"
futures = "0.3"
12 changes: 8 additions & 4 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::datasource::datasource::TableProviderFilterPushDown;
use datafusion::datasource::TableProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::Expr;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -129,6 +129,10 @@ impl TableProvider for BigtableDataSource {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

/// Create an ExecutionPlan that will scan the table.
/// The table provider will be usually responsible of grouping
/// the source data into partitions that can be efficiently
Expand Down Expand Up @@ -167,7 +171,7 @@ mod tests {
use arrow::datatypes::{DataType, Field};
use datafusion::assert_batches_eq;
use datafusion::error::Result;
use datafusion::prelude::ExecutionContext;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

#[tokio::test]
Expand All @@ -188,7 +192,7 @@ mod tests {
)
.await
.unwrap();
let mut ctx = ExecutionContext::new();
let ctx = SessionContext::new();
ctx.register_table("weather_balloons", Arc::new(bigtable_datasource))
.unwrap();
let mut batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?;
Expand Down Expand Up @@ -258,7 +262,7 @@ mod tests {
)
.await
.unwrap();
let mut ctx = ExecutionContext::new();
let ctx = SessionContext::new();
ctx.register_table("weather_balloons", Arc::new(bigtable_datasource))
.unwrap();
let mut batches = ctx.sql("SELECT region, balloon_id, event_minute, pressure, \"_timestamp\" FROM weather_balloons where region = 'us-west2' and balloon_id='3698' and event_minute = '2021-03-05-1200'").await?.collect().await?;
Expand Down
Loading

0 comments on commit 667f795

Please sign in to comment.