Skip to content

Commit

Permalink
feat: support simple "_row_key IN (xxx, yyy)" query
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 9, 2022
1 parent 965e796 commit d36b3e0
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 28 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ jobs:
gcloud beta emulators bigtable start --host-port ${BIGTABLE_EMULATOR_HOST} &
- name: Seed Bigtable Emulator
run: |
cbt -instance dev -project emulator createtable weather_balloons
cbt -instance dev -project emulator createfamily weather_balloons measurements
cbt -instance dev -project emulator set weather_balloons us-west2#3698#2021-03-05-1200 measurements:pressure=94558@1646637317700000
./script/seed.sh
- name: Build
run: cargo build --verbose
run: cargo build
- name: Run tests
run: cargo test --verbose
run: cargo test
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
bigtable:
docker run --rm --name bigtable -d -p 8086:8086 -v $(CURDIR)/script:/opt/script gcr.io/google.com/cloudsdktool/cloud-sdk gcloud beta emulators bigtable start --host-port=0.0.0.0:8086

seed:
docker exec -e BIGTABLE_EMULATOR_HOST="localhost:8086" bigtable /opt/script/seed.sh

build:
cargo build

test:
BIGTABLE_EMULATOR_HOST="localhost:8086" cargo test -- --nocapture

fmt:
cargo fmt
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons whe

### SQL
- ✅ select by `"_row_key" =`
- [ ] select by `"_row_key" IN`
- select by `"_row_key" IN`
- [ ] select by `"_row_key" BETWEEN`
- [ ] select by composite row keys (via `table_partition_cols` and `table_partition_separator`)
- [ ] Projection pushdown
Expand Down
9 changes: 9 additions & 0 deletions script/seed.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

cbt -instance dev -project emulator createtable weather_balloons
cbt -instance dev -project emulator createfamily weather_balloons measurements
cbt -instance dev -project emulator set weather_balloons us-west2#3698#2021-03-05-1200 measurements:pressure=94558@1614945605100000
cbt -instance dev -project emulator set weather_balloons us-west2#3698#2021-03-05-1201 measurements:pressure=94122@1614945665200000
cbt -instance dev -project emulator set weather_balloons us-west2#3698#2021-03-05-1202 measurements:pressure=95992@1614945725300000
cbt -instance dev -project emulator set weather_balloons us-west2#3698#2021-03-05-1203 measurements:pressure=96025@1614945785400000
cbt -instance dev -project emulator set weather_balloons us-west2#3698#2021-03-05-1204 measurements:pressure=96021@1614945845500000
33 changes: 23 additions & 10 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,15 @@ impl TableProvider for BigtableDataSource {
// The datasource should return *at least* this number of rows if available.
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (row_ranges, row_filters) = composer::compose(self.clone(), projection, filters);
Ok(Arc::new(BigtableExec::new(
self.clone(),
projection,
row_ranges,
row_filters,
)))
match composer::compose(self.clone(), projection, filters) {
Ok((row_ranges, row_filters)) => Ok(Arc::new(BigtableExec::new(
self.clone(),
projection,
row_ranges,
row_filters,
))),
Err(err) => Err(err),
}
}

/// Tests whether the table provider can make use of a filter expression
Expand Down Expand Up @@ -423,12 +425,23 @@ mod tests {
let mut ctx = ExecutionContext::new();
ctx.register_table("weather_balloons", Arc::new(bigtable_datasource))
.unwrap();
let batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?;
let expected = vec![
let mut batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?;
let mut expected = vec![
"+-------------------------------+----------+-------------------------+",
"| _row_key | pressure | _timestamp |",
"+-------------------------------+----------+-------------------------+",
"| us-west2#3698#2021-03-05-1200 | 94558 | 2021-03-05 12:00:05.100 |",
"+-------------------------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" IN ('us-west2#3698#2021-03-05-1200', 'us-west2#3698#2021-03-05-1201') ORDER BY \"_row_key\"").await?.collect().await?;
expected = vec![
"+-------------------------------+----------+-------------------------+",
"| _row_key | pressure | _timestamp |",
"+-------------------------------+----------+-------------------------+",
"| us-west2#3698#2021-03-05-1200 | 94558 | 2022-03-07 07:15:17.700 |",
"| us-west2#3698#2021-03-05-1200 | 94558 | 2021-03-05 12:00:05.100 |",
"| us-west2#3698#2021-03-05-1201 | 94122 | 2021-03-05 12:01:05.200 |",
"+-------------------------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);
Expand Down
68 changes: 56 additions & 12 deletions src/datasource/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use bigtable_rs::google::bigtable::v2::row_filter::Filter;
use bigtable_rs::google::bigtable::v2::row_range::EndKey;
use bigtable_rs::google::bigtable::v2::row_range::StartKey;
use bigtable_rs::google::bigtable::v2::{RowFilter, RowRange};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::Expr;
use datafusion::logical_plan::Operator;
use datafusion::scalar::ScalarValue;

pub fn compose(
datasource: BigtableDataSource,
_projection: &Option<Vec<usize>>,
filters: &[Expr],
) -> (Vec<RowRange>, Vec<RowFilter>) {
) -> Result<(Vec<RowRange>, Vec<RowFilter>)> {
let mut row_ranges = vec![];
let mut row_filters = if datasource.only_read_latest {
vec![RowFilter {
Expand All @@ -25,27 +27,69 @@ pub fn compose(

for filter in filters {
match filter {
Expr::BinaryExpr { left, op: _, right } => match left.as_ref() {
Expr::BinaryExpr { left, op, right } => match left.as_ref() {
Expr::Column(col) => {
if datasource.table_partition_cols.contains(&col.name) {
match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
key.clone().into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(key.clone().into_bytes())),
})
}
match op {
Operator::Eq => match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
key.clone().into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(
key.clone().into_bytes(),
)),
})
}
_ => (),
},
_ => (),
}
}
}
_ => (),
},
Expr::InList {
expr,
list,
negated,
} => match expr.as_ref() {
Expr::Column(col) => {
if datasource.table_partition_cols.contains(&col.name) {
if negated.to_owned() {
return Err(DataFusionError::Execution(
"_row_key filter: NOT IN is not supported".to_owned(),
));
}
for right in list {
match right {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
key.clone().into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(
key.clone().into_bytes(),
)),
})
}
_ => (),
}
}
}
}
_ => (),
},
_ => (),
}
}

(row_ranges, row_filters)
if row_ranges.is_empty() {
return Err(DataFusionError::Execution(
"_row_key filter is not provided or not supported".to_owned(),
));
}

Ok((row_ranges, row_filters))
}

0 comments on commit d36b3e0

Please sign in to comment.