Skip to content

Commit

Permalink
feat: support composite row keys with "="
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 11, 2022
1 parent b5aee39 commit 9fdbf63
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 21 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ let bigtable_datasource = BigtableDataSource::new(
"weather_balloons".to_owned(), // table
"measurements".to_owned(), // column family
vec!["_row_key".to_owned()], // table_partition_cols
"#".to_owned(), // table_partition_separator
vec![Field::new("pressure", DataType::Utf8, false)], // qualifiers
true, // only_read_latest
).await.unwrap();
Expand All @@ -35,7 +36,9 @@ ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons whe
- ✅ select by `"_row_key" =`
- ✅ select by `"_row_key" IN`
- ✅ select by `"_row_key" BETWEEN`
- [ ] select by composite row keys (via `table_partition_cols` and `table_partition_separator`)
- ✅ select by composite row keys `=`
- [ ] select by composite row keys `IN`
- [ ] select by composite row keys `BETWEEN`
- ✅ Projection pushdown
- [ ] Predicate push down ([Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range) and [Timestamp range](https://cloud.google.com/bigtable/docs/using-filters#timestamp-range))

Expand Down
88 changes: 79 additions & 9 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl BigtableDataSource {
table: String,
column_family: String,
table_partition_cols: Vec<String>,
table_partition_separator: String,
columns: Vec<Field>,
only_read_latest: bool,
) -> Result<Self> {
Expand Down Expand Up @@ -116,7 +117,7 @@ impl BigtableDataSource {
table: table,
column_family: column_family,
table_partition_cols: table_partition_cols,
table_partition_separator: DEFAULT_SEPARATOR.to_owned(),
table_partition_separator: table_partition_separator,
only_read_latest: only_read_latest,
schema: Arc::new(Schema::new(table_fields)),
connection: connection,
Expand Down Expand Up @@ -304,7 +305,7 @@ impl ExecutionPlan for BigtableExec {
})
});

let mut row_keys = vec![];
let mut table_partition_col_values: HashMap<String, Vec<String>> = HashMap::new();
let mut timestamps = vec![];
let mut qualifier_values: HashMap<String, Vec<Vec<u8>>> = HashMap::new();
for field in self.projected_schema.fields() {
Expand All @@ -316,7 +317,32 @@ impl ExecutionPlan for BigtableExec {
for (row_key, timestamp_qualifier_value) in rowkey_timestamp_qualifier_value.iter()
{
for (timestamp, qualifier_value) in timestamp_qualifier_value.iter() {
row_keys.push(row_key.to_owned());
if self.datasource.table_partition_cols.len() == 1 {
// no need to split
let table_partition_col = &self.datasource.table_partition_cols[0];
table_partition_col_values
.entry(table_partition_col.to_owned())
.or_insert(vec![]);
table_partition_col_values
.get_mut(table_partition_col)
.unwrap()
.push(row_key.to_owned())
} else {
let parts: Vec<&str> = row_key
.split(&self.datasource.table_partition_separator)
.collect();
for (i, table_partition_col) in
self.datasource.table_partition_cols.iter().enumerate()
{
table_partition_col_values
.entry(table_partition_col.to_owned())
.or_insert(vec![]);
table_partition_col_values
.get_mut(table_partition_col)
.unwrap()
.push(parts[i].to_owned())
}
}
timestamps.push(timestamp.to_owned());

for field in self.projected_schema.fields() {
Expand All @@ -338,10 +364,16 @@ impl ExecutionPlan for BigtableExec {
}
}

let mut data: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(row_keys)),
Arc::new(TimestampMicrosecondArray::from(timestamps)),
];
let mut data: Vec<ArrayRef> = vec![];
for table_partition_col in self.datasource.table_partition_cols.clone() {
let values: &Vec<String> = table_partition_col_values
.get(&table_partition_col)
.unwrap();
data.push(Arc::new(StringArray::from(values.clone())));
}

data.push(Arc::new(TimestampMicrosecondArray::from(timestamps)));

for field in self.projected_schema.fields() {
if self.datasource.is_qualifier(field.name()) {
let qualifier = field.name();
Expand Down Expand Up @@ -410,21 +442,22 @@ impl ExecutionPlan for BigtableExec {

#[cfg(test)]
mod tests {
use crate::datasource::{BigtableDataSource, RESERVED_ROWKEY};
use crate::datasource::{BigtableDataSource, DEFAULT_SEPARATOR, RESERVED_ROWKEY};
use arrow::datatypes::{DataType, Field};
use datafusion::assert_batches_eq;
use datafusion::error::Result;
use datafusion::prelude::ExecutionContext;
use std::sync::Arc;

#[tokio::test]
async fn test_sql_query() -> Result<()> {
async fn test_simple_row_key() -> Result<()> {
let bigtable_datasource = BigtableDataSource::new(
"emulator".to_owned(),
"dev".to_owned(),
"weather_balloons".to_owned(),
"measurements".to_owned(),
vec![RESERVED_ROWKEY.to_owned()],
DEFAULT_SEPARATOR.to_owned(),
vec![
Field::new("pressure", DataType::Int64, false),
// Bigtable does not support float number, so store as string
Expand Down Expand Up @@ -481,4 +514,41 @@ mod tests {
assert_batches_eq!(expected, &batches);
Ok(())
}

#[tokio::test]
async fn test_composite_row_key() -> Result<()> {
let bigtable_datasource = BigtableDataSource::new(
"emulator".to_owned(),
"dev".to_owned(),
"weather_balloons".to_owned(),
"measurements".to_owned(),
vec![
"region".to_owned(),
"balloon_id".to_owned(),
"event_minute".to_owned(),
],
DEFAULT_SEPARATOR.to_owned(),
vec![
Field::new("pressure", DataType::Int64, false),
// Bigtable does not support float number, so store as string
Field::new("temperature", DataType::Utf8, false),
],
true,
)
.await
.unwrap();
let mut ctx = ExecutionContext::new();
ctx.register_table("weather_balloons", Arc::new(bigtable_datasource))
.unwrap();
let batches = ctx.sql("SELECT region, balloon_id, event_minute, pressure, \"_timestamp\" FROM weather_balloons where region = 'us-west2' and balloon_id='3698' and event_minute = '2021-03-05-1200'").await?.collect().await?;
let expected = vec![
"+----------+------------+-----------------+----------+-------------------------+",
"| region | balloon_id | event_minute | pressure | _timestamp |",
"+----------+------------+-----------------+----------+-------------------------+",
"| us-west2 | 3698 | 2021-03-05-1200 | 94558 | 2021-03-05 12:00:05.100 |",
"+----------+------------+-----------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);
Ok(())
}
}
43 changes: 32 additions & 11 deletions src/datasource/composer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use crate::datasource::BigtableDataSource;
use bigtable_rs::google::bigtable::v2::row_filter::Filter;
use bigtable_rs::google::bigtable::v2::row_range::EndKey;
Expand Down Expand Up @@ -42,6 +44,8 @@ pub fn compose(
}

let mut row_ranges = vec![];
let mut table_partition_col_mapping: HashMap<String, String> = HashMap::new();

for filter in filters {
match filter {
Expr::BinaryExpr { left, op, right } => match left.as_ref() {
Expand All @@ -50,14 +54,8 @@ pub fn compose(
match op {
Operator::Eq => match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
key.clone().into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(
key.clone().into_bytes(),
)),
})
table_partition_col_mapping
.insert(col.name.clone(), key.clone());
}
_ => (),
},
Expand All @@ -76,7 +74,7 @@ pub fn compose(
if datasource.table_partition_cols.contains(&col.name) {
if negated.to_owned() {
return Err(DataFusionError::Execution(
"_row_key filter: NOT IN is not supported".to_owned(),
"_row_key: filter NOT IN is not supported".to_owned(),
));
}
for right in list {
Expand Down Expand Up @@ -108,7 +106,7 @@ pub fn compose(
if datasource.table_partition_cols.contains(&col.name) {
if negated.to_owned() {
return Err(DataFusionError::Execution(
"_row_key filter: NOT IN is not supported".to_owned(),
"_row_key: filter NOT IN is not supported".to_owned(),
));
}
match low.as_ref() {
Expand Down Expand Up @@ -136,9 +134,32 @@ pub fn compose(
}
}

if !table_partition_col_mapping.is_empty() {
let mut key_parts: Vec<String> = vec![];
for table_partition_col in datasource.table_partition_cols {
match table_partition_col_mapping.get(&table_partition_col) {
Some(value) => {
key_parts.push(value.to_owned());
}
_ => {
return Err(DataFusionError::Execution(format!(
"{}: filter is required",
table_partition_col
)));
}
}
}

let key = key_parts.join(&datasource.table_partition_separator);
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(key.clone().into_bytes())),
end_key: Some(EndKey::EndKeyClosed(key.clone().into_bytes())),
});
}

if row_ranges.is_empty() {
return Err(DataFusionError::Execution(
"_row_key filter is not provided or not supported".to_owned(),
"_row_key: filter is not provided or not supported".to_owned(),
));
}

Expand Down

0 comments on commit 9fdbf63

Please sign in to comment.