Skip to content

Commit

Permalink
style: clippy + fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
timvw committed Apr 23, 2024
1 parent 6959db1 commit 2a2d8f9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 55 deletions.
13 changes: 9 additions & 4 deletions examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ use aws_config::BehaviorVersion;
use aws_sdk_glue::config::{Credentials, ProvideCredentials};
use aws_types::SdkConfig;
use dashmap::DashMap;
use datafusion::arrow::array::StringArray;
use datafusion::common::{DataFusionError, Result};
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::*;
use datafusion_catalogprovider_glue::catalog_provider::glue::{GlueCatalogProvider, TableRegistrationOptions};
use datafusion_catalogprovider_glue::catalog_provider::glue::{
GlueCatalogProvider, TableRegistrationOptions,
};
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
use std::fmt::Debug;
use std::sync::Arc;
use datafusion::arrow::array::StringArray;
use url::Url;

#[tokio::main]
Expand All @@ -48,7 +50,10 @@ async fn main() -> Result<()> {
let mut glue_catalog_provider = GlueCatalogProvider::new(sdk_config, object_store_provider);

let register_results = glue_catalog_provider
.register_all_with_options(&TableRegistrationOptions::DeriveSchemaFromGlueTable, &ctx.state())
.register_all_with_options(
&TableRegistrationOptions::DeriveSchemaFromGlueTable,
&ctx.state(),
)
.await?;

for result in register_results {
Expand Down Expand Up @@ -179,7 +184,7 @@ impl ObjectStoreRegistry for DemoS3ObjectStoreProvider {
}

fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
if let Some(refx)= self.object_stores.get(url) {
if let Some(refx) = self.object_stores.get(url) {
let store = refx.value().clone();
Ok(store)
} else {
Expand Down
57 changes: 6 additions & 51 deletions src/catalog_provider/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::error::{GlueError, Result};
use crate::glue_data_type_parser::*;
use aws_sdk_glue::config::ProvideCredentials;
use aws_sdk_glue::types::{Column, StorageDescriptor, Table};
use aws_sdk_glue::Client;
use aws_types::SdkConfig;
Expand All @@ -17,13 +16,13 @@ use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use deltalake::DeltaTableBuilder;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::datasource::object_store::ObjectStoreRegistry;

/// Options to register a table
pub enum TableRegistrationOptions {
Expand All @@ -41,7 +40,6 @@ pub struct GlueCatalogProvider {
}

impl GlueCatalogProvider {

/// Create a new Glue CatalogProvider
pub fn new(sdk_config: SdkConfig, object_store_registry: Arc<dyn ObjectStoreRegistry>) -> Self {
let client = Client::new(&sdk_config);
Expand Down Expand Up @@ -190,7 +188,8 @@ impl GlueCatalogProvider {
None => HashMap::new(),
};

let table_type = table_parameters.get("table_type")
let table_type = table_parameters
.get("table_type")
.map(|x| x.to_lowercase())
.unwrap_or("".to_string());
if table_type == "delta" {
Expand Down Expand Up @@ -218,39 +217,16 @@ impl GlueCatalogProvider {
table_name: &String,
storage_location_uri: &str,
) -> Result<()> {

let url = url::Url::parse(storage_location_uri).map_err(|e| GlueError::Other(format!("Failed to parse {storage_location_uri} as url")))?;
let url = url::Url::parse(storage_location_uri).map_err(|_| {
GlueError::Other(format!("Failed to parse {storage_location_uri} as url"))
})?;
let object_store = self.object_store_registry.get_store(&url)?;

deltalake::aws::register_handlers(None);

let builder = DeltaTableBuilder::from_uri(storage_location_uri);

/*
let cp = self.sdk_config.credentials_provider().unwrap();
let creds = cp
.provide_credentials()
.await
.map_err(|e| GlueError::AWS(format!("Failed to get credentials: {e}")))?;
let mut storage_options: HashMap<String, String> =
HashMap::from_iter(std::env::vars().collect::<Vec<(String, String)>>());
storage_options.insert(
"aws_access_key_id".to_string(),
creds.access_key_id().to_string(),
);
storage_options.insert(
"aws_secret_access_key".to_string(),
creds.secret_access_key().to_string(),
);
if let Some(session_token) = creds.session_token() {
storage_options.insert("aws_session_token".to_string(), session_token.to_string());
}*/



let delta_table = builder
//.with_storage_options(storage_options)
.with_storage_backend(object_store, url)
.load()
.await
Expand Down Expand Up @@ -595,27 +571,6 @@ impl CatalogProvider for GlueCatalogProvider {
}
}

/*
async fn create_delta_table(
table: &Table,
object_store_registry: &Option<Arc<dyn ObjectStoreRegistry>>,
) -> Result<Arc<dyn TableProvider>> {
let location = table_location(table)?;
let url = url::Url::parse(&location)?;
let mut builder = DeltaTableBuilder::from_uri(&location);
if let Ok(object_store) = get_object_store(object_store_registry, &location) {
builder = builder.with_storage_backend(object_store, url);
}
builder
.with_storage_options(std::env::vars().collect())
.load()
.await
.map(|t| Arc::new(t) as Arc<dyn TableProvider>)
.map_err(|e| GlueError::Deltalake(e))
}*/

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 2a2d8f9

Please sign in to comment.