Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first pass at implementing predicate pushdown, seems to work #16

Merged
merged 2 commits into from
Apr 29, 2024

Conversation

maximedion2
Copy link
Collaborator

No description provided.

@maximedion2
Copy link
Collaborator Author

Okay so the relatively small amount of new code here doesn't reflect how long I spent on this haha, it's admittedly more complicated than I initially thought it would be. But I think this works in its current state.

It's a WIP because I want to revisit how I applied filter push downs in the reader. When I looked at arrow-rs, specifically at the parquet implementation, my understanding was that if you take in a predicate, you need to produce a record batch that completely satisfies it. However, looking into how it work in datafusion, it seems it's not the case, you can set a predicate as "inexact", in which case datafusion will take the record batches that were partially "filtered" and remove any row that's left that doesn't satisfy the predicate.

For parquet files, specifically when compression is not supported, there's value in trying to skip rows when reading, but we can't do that for (compressed) zarr data, so my somewhat complicated implementation of "row filtering" for zarr doesn't add any value. I want to simplify it and focus on what does add value -- skipping whole chunks when no values in the predicate chunks satisfy the predicate. I will then leave the "exact" filtering to datafusion, that will make things much cleaner.


// Below is all the logic necessary (I think) to convert a PhysicalExpr into a ZarrChunkFilter.
// The logic is mostly copied from datafusion, and is simplified here for the zarr use case.
pub struct ZarrFilterCandidate {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the end, this is still mostly index based. I'll make a few comments below to clarify the logic (I should actually write those comments in code, I'll do that before we merge), but column names are basically just used as an intermediate step.

fn pre_visit(&mut self, node: &Arc<dyn PhysicalExpr>) -> DataFusionResult<RewriteRecursion> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
if let Ok(idx) = self.file_schema.index_of(column.name()) {
self.required_column_indices.insert(idx);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So first, we accumulate indices of columns required (by a given predicate). These indices represent the position of the column in the file schema (which will eventually be the table schema, for now we don't have that distinction), e.g. if the predicate requires lat, lon and the file schema is float_data, lat, lon, we will end up setting the projection to [1, 2]. Since the set is ordered, I think even if in the predicate the order was lon, lat, we'd end up with [1, 2] as the projection.


impl ZarrDatafusionArrowPredicate {
pub fn new(candidate: ZarrFilterCandidate, schema: &Schema) -> DataFusionResult<Self> {
let cols: Vec<_> = candidate
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we convert the indices to the columns names, e.g. [1, 2] -> [lat, lon]. See below for how that's used.

.map(|idx| schema.field(*idx).name().to_string())
.collect();

let schema = Arc::new(schema.project(&candidate.projection)?);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go from the file schema to the predicate schema, e.g. float_data, lat, lon -> lat, lon.

.collect();

let schema = Arc::new(schema.project(&candidate.projection)?);
let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the physical expression has the name of each column as well as an index for each. Since it was first created off of a Expr, using the file schema, the indices for each column don't necessarily match what they will be in the record batch we pass to the physical expression. Assuming we will pass the physical expression a record batch that only contains the columns it needs, we need to remap indices to columns, e.g. we go from (lat, 1), (lon, 2) to (lat, 0), (lon, 1).

}

fn evaluate(&mut self, batch: &RecordBatch) -> Result<BooleanArray, ArrowError> {
let index_projection = self
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bit that depends on the column names. Here, the incoming record batch can have any number of columns, it doesn't matter, as long as it contains at least the columns the predicate needs. In the parquet implementation, again if I understood correctly, it's expected to come in with only the required columns, but by using column names here, that's not required, we figure out the indices in the record batch based on the column names, and re-project it before passing it to the physical expression. The re-projection does still happen in the parquet implementation, I think to handle different column orderings, but here we use it to also drop unnecessary columns, that way, for example if the predicate only requires the lon column, we can re-use a record batch that contains lat, lon.

Copy link
Collaborator

@tshauck tshauck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just scanning through leaving some comments. I'll come back to this later after I've let it stew for a bit.

src/datafusion/helpers.rs Show resolved Hide resolved
src/datafusion/table_factory.rs Outdated Show resolved Hide resolved
src/datafusion/table_provider.rs Show resolved Hide resolved
src/reader/zarr_read.rs Outdated Show resolved Hide resolved
}
}

pub fn build(mut self) -> DataFusionResult<Option<ZarrFilterCandidate>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there aren't any builder methods on this, maybe TryFrom? Also not super clear to my why it's an Option?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So I think we need a builder struct because the way the columns indices get "extracted" from the predicate is through the call to rewrite, which takes a mut reference to self, on the below line. That function requires the TreeNodeRewriter trait on its argument, and you can see that as pre_visit is called, the indices get progressively filled. To be honest, I didn't dig all the way down to how this works, I just followed the steps they follow for parquet since I didn't want to risk breaking something.

Regarding the Option, you're right that it's not clear from the code here, I believe it's like that because of the code here, https://github.com/datafusion-contrib/arrow-zarr/pull/16/files#diff-d61c0a121604c7680df3d272638903a3fc21fee9ac3381e34b5285c02b9deaf0R202-R213, specifically because the else statement returns None. Since the type of candidates is Vec<ZarrFilterCandidate>, I think the call to collect coerces options into the inner type (or skips the value if it's None)? And that means the type of candidate must be Option<...>, so that the if and else statements return types match. Again, I mostly followed the parquet implementation.

I know that just following someone else's code and replicating it somewhat naively is not the best excuse haha, but like I said I wanted to minimize the risk of messing things up here, since I'm not yet comfortable with the code base. Overall does this all make sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, when we start handling hive style partitions, and the logic gets more complicated, we might need to return a Ok(None) from build in some situations, I'm following the parquet logic but also simplifying it a lot (for now), so that might lead to code that looks a bit weird, temporarily.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool cool, yeah that all sounds good to me to get started with. If we notice some perf issues w/ the cloning + rewriting we can reassess later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I haven't been paying too much attention to everything that could impact performance so far, I'm thinking I'll revisit later when we have something fully functional.

src/datafusion/table_provider.rs Outdated Show resolved Hide resolved
@maximedion2 maximedion2 changed the title [WIP] first pass at implementing predicate pushdown, seems to work Apr 26, 2024
@maximedion2
Copy link
Collaborator Author

Okay I removed the row filtering logic in the readers, and the predicate push downs are now marked as Inexact, should be good to go.

Copy link
Collaborator

@tshauck tshauck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this seems good to me to get going on predicate pushdowns. Sometime this week I hope to play around with some of my data so that'll hopefully yield some insights into how this works for that use case.

}
}

pub fn build(mut self) -> DataFusionResult<Option<ZarrFilterCandidate>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool cool, yeah that all sounds good to me to get started with. If we notice some perf issues w/ the cloning + rewriting we can reassess later.

@maximedion2
Copy link
Collaborator Author

I think this seems good to me to get going on predicate pushdowns. Sometime this week I hope to play around with some of my data so that'll hopefully yield some insights into how this works for that use case.

Great, I'm looking forward to running all this against real data, I'm sure we'll find some issues or at least things to improve that would be hard to figure out with only the dummy test data I've been generating. I theory, I think most zarr features are supported with what's there, the main one I'm not supporting yet is missing chunks + fill values.

@maximedion2 maximedion2 merged commit 4fd4131 into main Apr 29, 2024
3 checks passed
@maximedion2 maximedion2 linked an issue May 2, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants