Skip to content

Commit

Permalink
feat: support composite row keys with "IN"
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 12, 2022
1 parent 9fdbf63 commit ee1ed77
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 25 deletions.
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons whe
- ✅ select by `"_row_key" IN`
- ✅ select by `"_row_key" BETWEEN`
- ✅ select by composite row keys `=`
- [ ] select by composite row keys `IN`
- select by composite row keys `IN`
- [ ] select by composite row keys `BETWEEN`
- ✅ Projection pushdown
- [ ] Predicate push down ([Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range) and [Timestamp range](https://cloud.google.com/bigtable/docs/using-filters#timestamp-range))

### General

- ✅ Projection pushdown
- [ ] Predicate push down
+ [Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range)
+ [Value Regex](https://cloud.google.com/bigtable/docs/using-filters#value-regex)
+ [Timestamp range](https://cloud.google.com/bigtable/docs/using-filters#timestamp-range))
- [ ] Multi Thread or Partition aware execution
- [ ] Production ready Bigtable SDK in Rust

Expand Down
19 changes: 15 additions & 4 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ mod tests {
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" IN ('us-west2#3698#2021-03-05-1200', 'us-west2#3698#2021-03-05-1201') ORDER BY \"_row_key\"").await?.collect().await?;
batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" IN ('us-west2#3698#2021-03-05-1200', 'us-west2#3698#2021-03-05-1201') ORDER BY \"_timestamp\"").await?.collect().await?;
expected = vec![
"+-------------------------------+----------+-------------------------+",
"| _row_key | pressure | _timestamp |",
Expand All @@ -501,7 +501,7 @@ mod tests {
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" BETWEEN 'us-west2#3698#2021-03-05-1200' AND 'us-west2#3698#2021-03-05-1202' ORDER BY \"_row_key\"").await?.collect().await?;
batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" BETWEEN 'us-west2#3698#2021-03-05-1200' AND 'us-west2#3698#2021-03-05-1202' ORDER BY \"_timestamp\"").await?.collect().await?;
expected = vec![
"+-------------------------------+----------+-------------------------+",
"| _row_key | pressure | _timestamp |",
Expand Down Expand Up @@ -540,12 +540,23 @@ mod tests {
let mut ctx = ExecutionContext::new();
ctx.register_table("weather_balloons", Arc::new(bigtable_datasource))
.unwrap();
let batches = ctx.sql("SELECT region, balloon_id, event_minute, pressure, \"_timestamp\" FROM weather_balloons where region = 'us-west2' and balloon_id='3698' and event_minute = '2021-03-05-1200'").await?.collect().await?;
let expected = vec![
let mut batches = ctx.sql("SELECT region, balloon_id, event_minute, pressure, \"_timestamp\" FROM weather_balloons where region = 'us-west2' and balloon_id='3698' and event_minute = '2021-03-05-1200'").await?.collect().await?;
let mut expected = vec![
"+----------+------------+-----------------+----------+-------------------------+",
"| region | balloon_id | event_minute | pressure | _timestamp |",
"+----------+------------+-----------------+----------+-------------------------+",
"| us-west2 | 3698 | 2021-03-05-1200 | 94558 | 2021-03-05 12:00:05.100 |",
"+----------+------------+-----------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT region, balloon_id, event_minute, pressure, \"_timestamp\" FROM weather_balloons where region = 'us-west2' and balloon_id IN ('3698') and event_minute IN ('2021-03-05-1200', '2021-03-05-1201') ORDER BY \"_timestamp\"").await?.collect().await?;
expected = vec![
"+----------+------------+-----------------+----------+-------------------------+",
"| region | balloon_id | event_minute | pressure | _timestamp |",
"+----------+------------+-----------------+----------+-------------------------+",
"| us-west2 | 3698 | 2021-03-05-1200 | 94558 | 2021-03-05 12:00:05.100 |",
"| us-west2 | 3698 | 2021-03-05-1201 | 94122 | 2021-03-05 12:01:05.200 |",
"+----------+------------+-----------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);
Expand Down
69 changes: 51 additions & 18 deletions src/datasource/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn compose(
}

let mut row_ranges = vec![];
let mut table_partition_col_mapping: HashMap<String, String> = HashMap::new();
let mut table_partition_col_mapping: HashMap<String, Vec<String>> = HashMap::new();

for filter in filters {
match filter {
Expand All @@ -55,7 +55,12 @@ pub fn compose(
Operator::Eq => match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
table_partition_col_mapping
.insert(col.name.clone(), key.clone());
.entry(col.name.to_owned())
.or_insert(vec![]);
table_partition_col_mapping
.get_mut(&col.name)
.unwrap()
.push(key.clone())
}
_ => (),
},
Expand All @@ -80,14 +85,13 @@ pub fn compose(
for right in list {
match right {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
key.clone().into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(
key.clone().into_bytes(),
)),
})
table_partition_col_mapping
.entry(col.name.to_owned())
.or_insert(vec![]);
table_partition_col_mapping
.get_mut(&col.name)
.unwrap()
.push(key.clone())
}
_ => (),
}
Expand Down Expand Up @@ -135,11 +139,23 @@ pub fn compose(
}

if !table_partition_col_mapping.is_empty() {
let mut key_parts: Vec<String> = vec![];
let mut batch_parts: Vec<Vec<String>> = vec![];
for table_partition_col in datasource.table_partition_cols {
match table_partition_col_mapping.get(&table_partition_col) {
Some(value) => {
key_parts.push(value.to_owned());
Some(list) => {
if batch_parts.is_empty() {
// initialize
// batch_parts = [], list = ["us-east1", "us-west2"]
// => batch_parts = [ ["us-east1"], ["us-west2"] ]
for value in list {
batch_parts.push(vec![value.to_owned()]);
}
} else {
// cross product
// batch_parts = [ ["us-east1"], ["us-west2"] ], list = ["3698", "3700"]
// => batch_parts = [ ["us-east1", "3698"], ["us-west2", "3698"], ["us-east1", "3700"], ["us-west2", "3700"] ]
batch_parts = partial_cartesian(batch_parts, list);
}
}
_ => {
return Err(DataFusionError::Execution(format!(
Expand All @@ -150,11 +166,13 @@ pub fn compose(
}
}

let key = key_parts.join(&datasource.table_partition_separator);
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(key.clone().into_bytes())),
end_key: Some(EndKey::EndKeyClosed(key.clone().into_bytes())),
});
for parts in batch_parts {
let key = parts.join(&datasource.table_partition_separator);
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(key.clone().into_bytes())),
end_key: Some(EndKey::EndKeyClosed(key.clone().into_bytes())),
});
}
}

if row_ranges.is_empty() {
Expand All @@ -165,3 +183,18 @@ pub fn compose(

Ok((row_ranges, row_filters))
}

pub fn partial_cartesian<T: Clone>(a: Vec<Vec<T>>, b: &[T]) -> Vec<Vec<T>> {
a.into_iter()
.flat_map(|xs| {
b.iter()
.cloned()
.map(|y| {
let mut vec = xs.clone();
vec.push(y);
vec
})
.collect::<Vec<_>>()
})
.collect()
}

0 comments on commit ee1ed77

Please sign in to comment.