Skip to content

Commit

Permalink
WIP: add Bigtable client to ExecutionPlan
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Feb 28, 2022
1 parent f49aac1 commit 3503d17
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions src/datasource.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::any::Any;
use std::sync::Arc;
use std::collections::HashMap;
use std::time::Duration;

use async_trait::async_trait;

Expand All @@ -14,20 +15,31 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::project_schema;
use datafusion::physical_plan::empty::EmptyExec;

use bigtable_rs::bigtable::{BigTableConnection, BigTable};

const RESERVED_ROWKEY: &str = "_row_key";
const RESERVED_TIMESTAMP: &str = "_timestamp";
const READ_ONLY:bool = true;
const CHANNEL_SIZE:usize = 1;
const TIMEOUT_IN_SECONDS:u64 = 600;

pub struct BigtableTable {
schema: SchemaRef,
client: Option<BigTable>,
}

impl BigtableTable {
pub fn new(instance: &str, table: &str, column_family: &str, columns: Vec<Field>, only_read_latest: bool) -> Self {
pub fn new(project: &str, instance: &str, table: &str, column_family: &str, columns: Vec<Field>, only_read_latest: bool) -> Self {
let metadata = HashMap::from([
("project".to_string(), project.to_string()),
("instance".to_string(), instance.to_string()),
("table".to_string(), table.to_string()),
("column_family".to_string(), column_family.to_string()),
("only_read_latest".to_string(), only_read_latest.to_string()),
]);
let schema = Arc::new(Schema::new_with_metadata(columns, metadata));
Self { schema }
let client = None;
Self { schema, client }
}
}

Expand Down Expand Up @@ -56,8 +68,22 @@ impl TableProvider for BigtableTable {
// from the datasource as a performance optimization.
// If set, it contains the amount of rows needed by the `LogicalPlan`,
// The datasource should return *at least* this number of rows if available.
limit: Option<usize>,
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
match self.client {
Some(x) => x,
None => {
let connection = BigTableConnection::new(
self.schema.metadata().get(&"project".to_string()).unwrap(),
self.schema.metadata().get(&"instance".to_string()).unwrap(),
READ_ONLY,
CHANNEL_SIZE,
Some(Duration::from_secs(TIMEOUT_IN_SECONDS)),
).await?;
self.client = Some(connection.client())
}
}

let projected_schema = project_schema(&self.schema, projection.as_ref())?;
Ok(Arc::new(EmptyExec::new(false, projected_schema)))
}
Expand Down

0 comments on commit 3503d17

Please sign in to comment.