Skip to content

Commit

Permalink
feat: support 64 bit big endian signed integer
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 10, 2022
1 parent 83fa64b commit 1376ab8
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 36 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ jobs:
run: |
gcloud components install beta bigtable cbt
gcloud beta emulators bigtable start --host-port ${BIGTABLE_EMULATOR_HOST} &
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: "3.9"
- name: Seed Bigtable Emulator
run: |
./script/seed.sh
./script/create_table.sh
pip install google-cloud-bigtable==2.4.0
python ./script/insert_rows.py
- name: Build
run: cargo build
- name: Run tests
Expand Down
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.9.10
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
bigtable:
docker run --rm --name bigtable -d -p 8086:8086 -v $(CURDIR)/script:/opt/script gcr.io/google.com/cloudsdktool/cloud-sdk gcloud beta emulators bigtable start --host-port=0.0.0.0:8086
docker network create bigtable
docker run --rm --network=bigtable --name bigtable -d -p 8086:8086 -v $(CURDIR)/script:/opt/script gcr.io/google.com/cloudsdktool/cloud-sdk gcloud beta emulators bigtable start --host-port=0.0.0.0:8086

seed:
docker exec -e BIGTABLE_EMULATOR_HOST="localhost:8086" bigtable /opt/script/seed.sh
# docker exec -e BIGTABLE_EMULATOR_HOST="localhost:8086" bigtable /opt/script/create_table.sh
# docker run --rm --network=bigtable -e BIGTABLE_EMULATOR_HOST="bigtable:8086" -v $(CURDIR)/script:/opt/script python:3.9 /bin/bash -c "pip install google-cloud-bigtable==2.4.0 && python /opt/script/insert_rows.py"
# docker run --rm -e BIGTABLE_EMULATOR_HOST="host.docker.internal:8086" -v $(CURDIR)/script:/opt/script python:3.9 /bin/bash -c "pip install google-cloud-bigtable==2.4.0 && python /opt/script/insert_rows.py"
pip install google-cloud-bigtable==2.4.0 && BIGTABLE_EMULATOR_HOST="localhost:8086" python ./script/insert_rows.py

build:
cargo build
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@ ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons whe
### Bigtable

- ✅ UTF8 string
- [ ] 64-bit big-endian signed integer
- 64-bit big-endian signed integer

### SQL
- ✅ select by `"_row_key" =`
- ✅ select by `"_row_key" IN`
- ✅ select by `"_row_key" BETWEEN`
- [ ] select by composite row keys (via `table_partition_cols` and `table_partition_separator`)
- ✅ Projection pushdown
- [ ] Predicate push down ([Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range))
- [ ] Limit Pushdown
- [ ] Predicate push down ([Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range) and [Timestamp range](https://cloud.google.com/bigtable/docs/using-filters#timestamp-range))

### General
- [ ] Multi Thread or Partition aware execution
Expand Down
3 changes: 3 additions & 0 deletions script/create_table.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
cbt -instance dev -project emulator createtable weather_balloons
cbt -instance dev -project emulator createfamily weather_balloons measurements
37 changes: 37 additions & 0 deletions script/insert_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# cbt does not support setting integer value
# so use Python SDK
from google.auth.credentials import AnonymousCredentials
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud import bigtable
from google.cloud.bigtable.table import DEFAULT_RETRY
from google.cloud.bigtable.row import DirectRow

data_tuples = [
("us-west2#3698#2021-03-05-1200", 94558, "9.6", 1614945605100000),
("us-west2#3698#2021-03-05-1201", 94122, "9.7", 1614945665200000),
("us-west2#3698#2021-03-05-1202", 95992, "9.5", 1614945725300000),
("us-west2#3698#2021-03-05-1203", 96025, "9.5", 1614945785400000),
("us-west2#3698#2021-03-05-1204", 96021, "9.6", 1614945845500000),
]

PROJECT_ID = "emulator"
INSTANCE_ID = "dev"
TABLE_NAME = "weather_balloons"
COLUMN_FAMILY_ID = "measurements"
SUCCESS = 0

client = bigtable.client.Client(project=PROJECT_ID, credentials=AnonymousCredentials())
bigtable_table = client.instance(INSTANCE_ID).table(TABLE_NAME)

rows = []
for (row_key, pressure, temperature, timestamp_microseconds) in data_tuples:
row = DirectRow(row_key.encode())
# integer value
row.set_cell(COLUMN_FAMILY_ID, "pressure".encode(), pressure, timestamp=_datetime_from_microseconds(timestamp_microseconds))
# string value
row.set_cell(COLUMN_FAMILY_ID, "temperature".encode(), temperature.encode(), timestamp=_datetime_from_microseconds(timestamp_microseconds))
rows.append(row)

# no retry
statues = bigtable_table.mutate_rows(rows, timeout=5, retry=DEFAULT_RETRY.with_deadline(5))
print([(s.code == SUCCESS, s) for s in statues])
9 changes: 0 additions & 9 deletions script/seed.sh

This file was deleted.

54 changes: 33 additions & 21 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::Statistics;

// use arrow::array::Int64Array;
use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::array::StringArray;
use arrow::array::TimestampMicrosecondArray;
use arrow::record_batch::RecordBatch;
Expand All @@ -37,7 +37,7 @@ use bigtable_rs::google::bigtable::v2::RowFilter;
use bigtable_rs::google::bigtable::v2::RowRange;
use bigtable_rs::google::bigtable::v2::RowSet;

// use byteorder::{BigEndian, ByteOrder};
use byteorder::{BigEndian, ByteOrder};

mod composer;

Expand Down Expand Up @@ -278,7 +278,7 @@ impl ExecutionPlan for BigtableExec {
Ok(resp) => {
let mut rowkey_timestamp_qualifier_value: HashMap<
String,
HashMap<i64, HashMap<String, String>>,
HashMap<i64, HashMap<String, Vec<u8>>>,
> = HashMap::new();
resp.into_iter().for_each(|(key, data)| {
let row_key = String::from_utf8(key.clone()).unwrap();
Expand All @@ -294,21 +294,19 @@ impl ExecutionPlan for BigtableExec {
.or_insert(HashMap::new());

let qualifier = String::from_utf8(row_cell.qualifier).unwrap();
// let cell_value = BigEndian::read_i64(&row_cell.value);
let cell_value = String::from_utf8(row_cell.value).unwrap();
rowkey_timestamp_qualifier_value
.get_mut(&row_key)
.unwrap()
.get_mut(&timestamp)
.unwrap()
.entry(qualifier)
.or_insert(cell_value);
.or_insert(row_cell.value);
})
});

let mut row_keys = vec![];
let mut timestamps = vec![];
let mut qualifier_values: HashMap<String, Vec<String>> = HashMap::new();
let mut qualifier_values: HashMap<String, Vec<Vec<u8>>> = HashMap::new();
for field in self.projected_schema.fields() {
if self.datasource.is_qualifier(field.name()) {
qualifier_values.insert(field.name().clone(), vec![]);
Expand All @@ -332,10 +330,7 @@ impl ExecutionPlan for BigtableExec {
.push(cell_value.to_owned());
}
_ => {
qualifier_values
.get_mut(qualifier)
.unwrap()
.push("".to_owned());
qualifier_values.get_mut(qualifier).unwrap().push(vec![]);
}
}
}
Expand All @@ -350,9 +345,23 @@ impl ExecutionPlan for BigtableExec {
for field in self.projected_schema.fields() {
if self.datasource.is_qualifier(field.name()) {
let qualifier = field.name();
data.push(Arc::new(StringArray::from(
qualifier_values.get(qualifier).unwrap().clone(),
)));
match field.data_type() {
DataType::Int64 => {
let mut decoded_values: Vec<i64> = vec![];
for encoded_value in qualifier_values.get(qualifier).unwrap() {
decoded_values.push(BigEndian::read_i64(encoded_value));
}
data.push(Arc::new(Int64Array::from(decoded_values)));
}
_ => {
let mut decoded_values: Vec<String> = vec![];
for encoded_value in qualifier_values.get(qualifier).unwrap() {
decoded_values
.push(String::from_utf8(encoded_value.to_vec()).unwrap());
}
data.push(Arc::new(StringArray::from(decoded_values)));
}
}
}
}

Expand Down Expand Up @@ -416,8 +425,11 @@ mod tests {
"weather_balloons".to_owned(),
"measurements".to_owned(),
vec![RESERVED_ROWKEY.to_owned()],
// vec![Field::new("pressure", DataType::Int64, false)],
vec![Field::new("pressure", DataType::Utf8, false)],
vec![
Field::new("pressure", DataType::Int64, false),
// Bigtable does not support float number, so store as string
Field::new("temperature", DataType::Utf8, false),
],
true,
)
.await
Expand All @@ -437,11 +449,11 @@ mod tests {

batches = ctx.sql("SELECT * FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?;
expected = vec![
"+-------------------------------+-------------------------+----------+",
"| _row_key | _timestamp | pressure |",
"+-------------------------------+-------------------------+----------+",
"| us-west2#3698#2021-03-05-1200 | 2021-03-05 12:00:05.100 | 94558 |",
"+-------------------------------+-------------------------+----------+",
"+-------------------------------+-------------------------+----------+-------------+",
"| _row_key | _timestamp | pressure | temperature |",
"+-------------------------------+-------------------------+----------+-------------+",
"| us-west2#3698#2021-03-05-1200 | 2021-03-05 12:00:05.100 | 94558 | 9.6 |",
"+-------------------------------+-------------------------+----------+-------------+",
];
assert_batches_eq!(expected, &batches);

Expand Down

0 comments on commit 1376ab8

Please sign in to comment.