-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve stream #15
Improve stream #15
Conversation
let s = s.join("."); | ||
|
||
let p = self.location.child(var.to_string()).child(s); | ||
let separator = separators |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an actual bug fix, that needs to go in, unrelated to the async stream work.
src/datafusion/scanner.rs
Outdated
@@ -185,7 +185,10 @@ mod tests { | |||
let schema = first_batch.schema(); | |||
|
|||
let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(); | |||
assert_eq!(names, vec!["lon", "lat"]); | |||
let target_names = ["lon", "lat", "float_data"]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, in the case or zarr stores, there is no clear order for the "columns", unlike e.g. a parquet file. So while this test as is might have run fine when you tried it, when I did it was picking up "lat" and "float_data", I think it might actually be non-deterministic? Do you think that this could potentially be a problem? When multiple record batches are combined, we could maybe run into issues? If that's the case I could enforce that columns are read in alphabetical order, that would fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that, but yeah, would be good to give it some order or way to make it deterministic between schema inference and scanning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no worries, we found an issue with a test, that's part of the process! I'll add a fix to this PR, probably tomorrow, and I'll update some tests to explicitly check for column order in the output.
@@ -192,12 +192,8 @@ impl<T: ZarrIterator> ZarrRecordBatchReader<T> { | |||
let mut arrs: Vec<ArrayRef> = Vec::with_capacity(self.meta.get_num_columns()); | |||
let mut fields: Vec<FieldRef> = Vec::with_capacity(self.meta.get_num_columns()); | |||
|
|||
// the sort below is important, because within a zarr store, the different arrays are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out this is a proper bug, because I already thought about this before (and then I forgot about it), but the way I tried to enforce ordering wasn't quite right. It should be fixed now.
@@ -185,7 +185,7 @@ mod tests { | |||
let schema = first_batch.schema(); | |||
|
|||
let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(); | |||
assert_eq!(names, vec!["lon", "lat"]); | |||
assert_eq!(names, vec!["lat", "lon"]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I undid my change, restored the test to what it was before except that now it's checking for the last 2 columns in alphabetical order (so the columns are "float_data", "lat", "lon", and it's now picking up the last 2, always).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, just left a few nits, but looks like a good place to stop before adding the filter pushdown.
src/reader/metadata.rs
Outdated
@@ -675,6 +675,10 @@ impl ZarrStoreMetadata { | |||
} | |||
} | |||
|
|||
// the sort below is important, because within a zarr store, the different arrays are | |||
// not ordered, so there is no predefined order for the different columns. we effectively | |||
// define one here, my ordering the columns alphabetically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// define one here, my ordering the columns alphabetically. | |
// define one here, by ordering the columns alphabetically. |
src/reader/zarr_read.rs
Outdated
|
||
// the sort below is important, because within a zarr store, the different arrays are | ||
// not ordered, so there is no predefined order for the different columns. we effectively | ||
// define one here, my ordering the columns alphabetically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// define one here, my ordering the columns alphabetically. | |
// define one here, by ordering the columns alphabetically. |
@@ -0,0 +1,28 @@ | |||
use arrow_zarr::async_reader::{ZarrPath, ZarrRecordBatchStreamBuilderNonBlocking}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but missing copyright. DataFusion at least has a checker as part of its CI, which may make sense to use here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a copyright here and in several other files.
T: ZarrStream + Unpin + Send + 'static, | ||
{ | ||
type Item = ZarrResult<RecordBatch>; | ||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function has a few .expects
and .unwraps
. I get some aren't possible, but in those cases perhaps we can follow something more idiomatic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed some of those .unwraps
, I'll keep cleaning up the code as I make more changes.
src/async_reader/mod.rs
Outdated
self.store_wrapper = Some(wrapper); | ||
|
||
// if store returns none, it's the end and it's time to return | ||
if chunk.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.
let chunk = if let Some(chunk) = chunk {
chunk // or maybe do more here
} else {
self.state = ZarrStreamStateNonBlocking::Done;
return Poll::Ready(None);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah you're right, I've been following a few patterns that are not really rust-idiomatic, I've been learning rust on-and-off for the last 2 years or so and I sometimes get sloppy, like these .is_none
followed by an unwrap
call. I'll do a quick check tonight and clean it up a bit.
.gitignore
Outdated
target | ||
.vscode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, you can make a ~/.gitignore
and put your editor specific stuff there, then you won't need to worry about adding these things to individual repos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, done.
No description provided.