Skip to content

Commit

Permalink
feat: datafusion scanner (#10)
Browse files Browse the repository at this point in the history
* feat: datafusion scanner
* refactor: cleanups/tests/improvements
* docs: remove errant copyright
* test: more complete type test
* refactor: update error type
* refactor: remove batch size for now
* fix: add df error feature macro
* docs: fix copy error
  • Loading branch information
tshauck committed Mar 26, 2024
1 parent f3f03c7 commit 09bf6d2
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 10 deletions.
17 changes: 16 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
run: |
cargo fmt --all -- --check
check-clippy:
check-clippy-all-features:
name: Check cargo clippy
runs-on: ubuntu-latest
container:
Expand All @@ -34,3 +34,18 @@ jobs:
- name: Run check
run: |
cargo clippy --all-targets --all-features -- -D warnings
# Check clippy without features, helps to catch missing feature configurations
check-clippy-no-features:
name: Check cargo clippy
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- name: Run check
run: |
cargo clippy --all-targets -- -D warnings
18 changes: 16 additions & 2 deletions src/datafusion/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,28 @@ use std::sync::Arc;

use object_store::ObjectStore;

/// Configuration for Zarr DataFusion processing.
#[derive(Clone)]
pub struct ZarrConfig {
// The object store to use.
/// The object store to use.
pub object_store: Arc<dyn ObjectStore>,

/// The projection for the scan.
pub projection: Option<Vec<usize>>,
}

impl ZarrConfig {
/// Create a new ZarrConfig.
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
Self { object_store }
Self {
object_store,
projection: None,
}
}

/// Set the projection for the scan.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
}
16 changes: 10 additions & 6 deletions src/datafusion/file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use arrow_schema::ArrowError;
use datafusion::{datasource::physical_plan::FileOpener, error::DataFusionError};
use futures::{StreamExt, TryStreamExt};

use crate::async_reader::{ZarrPath, ZarrRecordBatchStreamBuilder};
use crate::{
async_reader::{ZarrPath, ZarrRecordBatchStreamBuilder},
reader::ZarrProjection,
};

use super::config::ZarrConfig;

Expand All @@ -44,15 +47,16 @@ impl FileOpener for ZarrFileOpener {
let zarr_path = ZarrPath::new(config.object_store, file_meta.object_meta.location);

let rng = file_meta.range.map(|r| (r.start as usize, r.end as usize));

let projection = ZarrProjection::from(config.projection.as_ref());

let batch_reader = ZarrRecordBatchStreamBuilder::new(zarr_path)
.with_projection(projection)
.build_partial_reader(rng)
.await
.map_err(|_| {
DataFusionError::Execution("Error creating zarr reader".to_string())
})?;
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream = batch_reader
.map_err(|_| ArrowError::ComputeError("Error reading zarr".to_string()));
let stream = batch_reader.map_err(|e| ArrowError::from_external_error(Box::new(e)));

Ok(stream.boxed())
}))
Expand Down
1 change: 1 addition & 0 deletions src/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

pub mod config;
pub mod file_opener;
pub mod scanner;
192 changes: 192 additions & 0 deletions src/datafusion/scanner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use datafusion::{
common::Statistics,
datasource::physical_plan::{FileScanConfig, FileStream},
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream,
},
};

use super::{config::ZarrConfig, file_opener::ZarrFileOpener};

#[derive(Debug, Clone)]
/// Implements a DataFusion `ExecutionPlan` for Zarr files.
pub struct ZarrScan {
/// The base configuration for the file scan.
base_config: FileScanConfig,

/// The projected schema for the scan.
projected_schema: SchemaRef,

/// Metrics for the execution plan.
metrics: ExecutionPlanMetricsSet,

/// The statistics for the scan.
statistics: Statistics,
}

impl ZarrScan {
/// Create a new Zarr scan.
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, statistics, _lex_sorting) = base_config.project();

Self {
base_config,
projected_schema,
metrics: ExecutionPlanMetricsSet::new(),
statistics,
}
}
}

impl DisplayAs for ZarrScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ZarrScan")
}
}

impl ExecutionPlan for ZarrScan {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn statistics(&self) -> datafusion::error::Result<Statistics> {
Ok(self.statistics.clone())
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config =
ZarrConfig::new(object_store).with_projection(self.base_config.projection.clone());

let opener = ZarrFileOpener::new(config);

let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}

fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> {
None
}
}

#[cfg(test)]
mod tests {
use std::{error::Error, sync::Arc};

use datafusion::{
datasource::{listing::PartitionedFile, physical_plan::FileMeta},
execution::object_store::ObjectStoreUrl,
};
use futures::TryStreamExt;
use object_store::{local::LocalFileSystem, path::Path, ObjectMeta};

use crate::{
async_reader::{ZarrPath, ZarrReadAsync},
tests::get_test_v2_data_path,
};

use super::*;

#[tokio::test]
async fn test_open() -> Result<(), Box<dyn Error>> {
let local_fs = Arc::new(LocalFileSystem::new());

let test_data = get_test_v2_data_path("lat_lon_example.zarr".to_string());
let location = Path::from_filesystem_path(&test_data)?;

let file_meta = FileMeta {
object_meta: ObjectMeta {
location,
last_modified: chrono::Utc::now(),
size: 0,
e_tag: None,
version: None,
},
range: None,
extensions: None,
};

let zarr_path = ZarrPath::new(local_fs, file_meta.object_meta.location);
let schema = zarr_path.get_zarr_metadata().await?.arrow_schema()?;

let test_file = Path::from_filesystem_path(test_data)?;
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: Arc::new(schema.clone()),
file_groups: vec![vec![PartitionedFile::new(test_file.to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![1, 2]),
limit: Some(10),
table_partition_cols: vec![],
output_ordering: vec![],
};

let scanner = ZarrScan::new(scan_config);

let session = datafusion::execution::context::SessionContext::new();

let batch_stream = scanner.execute(0, session.task_ctx())?;
let batches: Vec<_> = batch_stream.try_collect().await?;

assert_eq!(batches.len(), 1);

let first_batch = &batches[0];
assert_eq!(first_batch.num_columns(), 2);
assert_eq!(first_batch.num_rows(), 10);

let schema = first_batch.schema();

let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
assert_eq!(names, vec!["lon", "lat"]);

Ok(())
}
}
87 changes: 87 additions & 0 deletions src/reader/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,44 @@ impl ZarrDataType {
}
}

impl TryFrom<&ZarrDataType> for DataType {
type Error = ZarrError;

fn try_from(value: &ZarrDataType) -> ZarrResult<Self> {
match value {
ZarrDataType::Bool => Ok(DataType::Boolean),
ZarrDataType::UInt(s) => match s {
1 => Ok(DataType::UInt8),
2 => Ok(DataType::UInt16),
4 => Ok(DataType::UInt32),
8 => Ok(DataType::UInt64),
_ => Err(throw_invalid_meta("Invalid uint size")),
},
ZarrDataType::Int(s) => match s {
1 => Ok(DataType::Int8),
2 => Ok(DataType::Int16),
4 => Ok(DataType::Int32),
8 => Ok(DataType::Int64),
_ => Err(throw_invalid_meta("Invalid int size")),
},
ZarrDataType::Float(s) => match s {
4 => Ok(DataType::Float32),
8 => Ok(DataType::Float64),
_ => Err(throw_invalid_meta("Invalid float size")),
},
ZarrDataType::FixedLengthString(_) => Ok(DataType::Utf8),
ZarrDataType::FixedLengthPyUnicode(_) => Ok(DataType::Utf8),
ZarrDataType::TimeStamp(_, s) => match s.as_str() {
"s" => Ok(DataType::Timestamp(TimeUnit::Second, None)),
"ms" => Ok(DataType::Timestamp(TimeUnit::Millisecond, None)),
"us" => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
"ns" => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
_ => Err(throw_invalid_meta("Invalid timestamp unit")),
},
}
}
}

// This is the byte length of the Py Unicode characters that zarr writes
// when the output type is set to U<length>.
pub(crate) const PY_UNICODE_SIZE: usize = 4;
Expand Down Expand Up @@ -933,4 +971,53 @@ mod zarr_codecs_tests {
let target_arr: UInt16Array = vec![32, 33, 39, 40, 34, 41, 46, 47, 48].into();
assert_eq!(*arr, target_arr);
}

#[test]
fn test_zarr_data_type_to_arrow_datatype() -> ZarrResult<()> {
let zarr_types = [
ZarrDataType::Bool,
ZarrDataType::UInt(1),
ZarrDataType::UInt(2),
ZarrDataType::UInt(4),
ZarrDataType::UInt(8),
ZarrDataType::Int(1),
ZarrDataType::Int(2),
ZarrDataType::Int(4),
ZarrDataType::Int(8),
ZarrDataType::Float(4),
ZarrDataType::Float(8),
ZarrDataType::TimeStamp(8, "s".to_string()),
ZarrDataType::TimeStamp(8, "ms".to_string()),
ZarrDataType::TimeStamp(8, "us".to_string()),
ZarrDataType::TimeStamp(8, "ns".to_string()),
];

let exepcted_types = [
DataType::Boolean,
DataType::UInt8,
DataType::UInt16,
DataType::UInt32,
DataType::UInt64,
DataType::Int8,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
];

assert_eq!(zarr_types.len(), exepcted_types.len());

for (zarr_type, expected_type) in zarr_types.iter().zip(exepcted_types.iter()) {
let arrow_type = DataType::try_from(zarr_type)?;

assert_eq!(arrow_type, *expected_type);
}

Ok(())
}
}
Loading

0 comments on commit 09bf6d2

Please sign in to comment.