Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve stream #15

Merged
merged 5 commits into from
Apr 8, 2024
Merged

Improve stream #15

merged 5 commits into from
Apr 8, 2024

Conversation

maximedion2
Copy link
Collaborator

No description provided.

@maximedion2 maximedion2 requested a review from tshauck April 5, 2024 04:57
let s = s.join(".");

let p = self.location.child(var.to_string()).child(s);
let separator = separators
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actual bug fix, that needs to go in, unrelated to the async stream work.

@@ -185,7 +185,10 @@ mod tests {
let schema = first_batch.schema();

let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
assert_eq!(names, vec!["lon", "lat"]);
let target_names = ["lon", "lat", "float_data"];
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in the case or zarr stores, there is no clear order for the "columns", unlike e.g. a parquet file. So while this test as is might have run fine when you tried it, when I did it was picking up "lat" and "float_data", I think it might actually be non-deterministic? Do you think that this could potentially be a problem? When multiple record batches are combined, we could maybe run into issues? If that's the case I could enforce that columns are read in alphabetical order, that would fix it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, but yeah, would be good to give it some order or way to make it deterministic between schema inference and scanning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries, we found an issue with a test, that's part of the process! I'll add a fix to this PR, probably tomorrow, and I'll update some tests to explicitly check for column order in the output.

@@ -192,12 +192,8 @@ impl<T: ZarrIterator> ZarrRecordBatchReader<T> {
let mut arrs: Vec<ArrayRef> = Vec::with_capacity(self.meta.get_num_columns());
let mut fields: Vec<FieldRef> = Vec::with_capacity(self.meta.get_num_columns());

// the sort below is important, because within a zarr store, the different arrays are
Copy link
Collaborator Author

@maximedion2 maximedion2 Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out this is a proper bug, because I already thought about this before (and then I forgot about it), but the way I tried to enforce ordering wasn't quite right. It should be fixed now.

@@ -185,7 +185,7 @@ mod tests {
let schema = first_batch.schema();

let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
assert_eq!(names, vec!["lon", "lat"]);
assert_eq!(names, vec!["lat", "lon"]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I undid my change, restored the test to what it was before except that now it's checking for the last 2 columns in alphabetical order (so the columns are "float_data", "lat", "lon", and it's now picking up the last 2, always).

Copy link
Collaborator

@tshauck tshauck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, just left a few nits, but looks like a good place to stop before adding the filter pushdown.

@@ -675,6 +675,10 @@ impl ZarrStoreMetadata {
}
}

// the sort below is important, because within a zarr store, the different arrays are
// not ordered, so there is no predefined order for the different columns. we effectively
// define one here, my ordering the columns alphabetically.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// define one here, my ordering the columns alphabetically.
// define one here, by ordering the columns alphabetically.

// the sort below is important, because within a zarr store, the different arrays are
// not ordered, so there is no predefined order for the different columns. we effectively
// define one here, my ordering the columns alphabetically.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// define one here, my ordering the columns alphabetically.
// define one here, by ordering the columns alphabetically.
@@ -0,0 +1,28 @@
use arrow_zarr::async_reader::{ZarrPath, ZarrRecordBatchStreamBuilderNonBlocking};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but missing copyright. DataFusion at least has a checker as part of its CI, which may make sense to use here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a copyright here and in several other files.

T: ZarrStream + Unpin + Send + 'static,
{
type Item = ZarrResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function has a few .expects and .unwraps. I get some aren't possible, but in those cases perhaps we can follow something more idiomatic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed some of those .unwraps, I'll keep cleaning up the code as I make more changes.

self.store_wrapper = Some(wrapper);

// if store returns none, it's the end and it's time to return
if chunk.is_none() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

let chunk = if let Some(chunk) = chunk {
    chunk // or maybe do more here
} else {
  self.state = ZarrStreamStateNonBlocking::Done;
  return Poll::Ready(None);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you're right, I've been following a few patterns that are not really rust-idiomatic, I've been learning rust on-and-off for the last 2 years or so and I sometimes get sloppy, like these .is_none followed by an unwrap call. I'll do a quick check tonight and clean it up a bit.

.gitignore Outdated
target
.vscode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, you can make a ~/.gitignore and put your editor specific stuff there, then you won't need to worry about adding these things to individual repos.

https://gist.github.com/subfuzion/db7f57fff2fb6998a16c

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, done.

@maximedion2 maximedion2 merged commit 43a5edb into main Apr 8, 2024
3 checks passed
@maximedion2 maximedion2 linked an issue May 2, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants