-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Implement Parquet filter pushdown via new filter pushdown APIs #15769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pointing out current issues to move forward with implementing parquet filter pushdown via the new APIs we've introduced
cc @alamb @berkaysynnada for ideas
Arc::new(ParquetSource::default()) | ||
todo!() // need access of file schema? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This poses an issue.
TLDR is that in order to know if it can absorb a filter as exact ParquetSource
needs to know not only the filter but also the file schema it's applied to (in particular to get the type of the columns since it can't handle structs).
let remaining_description = if config.execution.parquet.pushdown_filters { | ||
let mut remaining_filters = fd.filters.clone(); | ||
for filter in &remaining_filters { | ||
if can_expr_be_pushed_down_with_schemas(filter, &conf.file_schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we need the file schema
Thanks @adriangb -- I am about to be offline for a week so I will review this when I return |
// if we can't push it down completely with only the filename-based/path-based | ||
// column names, then we should check if we can do parquet predicate pushdown | ||
let supports_pushdown = self.options.format.supports_filters_pushdown( | ||
&self.file_schema, | ||
&self.table_schema, | ||
&[filter], | ||
)?; | ||
|
||
if supports_pushdown == FilePushdownSupport::Supported { | ||
return Ok(TableProviderFilterPushDown::Exact); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of this PR is that this moves from being something specialized that ListingTable
does to anything that works for any TableProvider / they don't need to do anything special! The checks for compatibility also happen all within the parquet data source machinery, instead of leaking implementations via supports_filters_pushdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one question: aren't we expecting/preparing for, people to use ListingTable if they read Parquet files? Are we eventually planning to remove all format-specific handlings? Or this is a case only for filter pushdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case, why don't we fully remove supports_filters_pushdown()
API at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think many users of DataFusion (based on our usage, talks I've seen and examples we have) use custom TableProvider
implementations.
I would keep supports_filters_pushdown
so that TableProviders
can do Exact
pruning of filters, e.g. using partition columns.
source = source.with_predicate(Arc::clone(&file_schema), predicate); | ||
source = source.with_predicate(predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seemed like an easy win since I was able to just change this so that the schema is always passed in by the FileSourceConfigBuilder
instead of only when with_predicate
is called.
This was necessary becasue with_predicate
is no longer called to attach a predicate, instaed it happens during an optimization pass so ParquetSource
neesd to have it available at that point.
I left with_predicate
in there to avoid churn and in case there is a use case for attaching a predicate directly through the scan instad of a as a FilterExec that later gets pushed into the scan.
@@ -244,7 +242,7 @@ impl ParquetExecBuilder { | |||
inner: DataSourceExec::new(Arc::new(base_config.clone())), | |||
base_config, | |||
predicate, | |||
pruning_predicate: parquet.pruning_predicate, | |||
pruning_predicate: None, // for backwards compat since `ParquetExec` is only for backwards compat anyway |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open to other suggestions (i.e. removing it). I felt like this minimizes breakage for folks still using ParquetExec
, who are likely the same folks that want to do the least amount of work possible to upgrade.
let table_schema = get_basic_table_schema(); | ||
|
||
let file_schema = Schema::new(vec![Field::new( | ||
"list_col", | ||
DataType::Struct(Fields::empty()), | ||
true, | ||
)]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was wrong! It wanted to test that list_col
prevents pushdown because it's a nested type. Instead it was prevented because list_col
is not in the table / schema!
let pruning_predicate_string = self | ||
.pruning_predicate | ||
.as_ref() | ||
.map(|pre| { | ||
let mut guarantees = pre | ||
.literal_guarantees() | ||
.iter() | ||
.map(|item| format!("{}", item)) | ||
.collect_vec(); | ||
guarantees.sort(); | ||
format!( | ||
", pruning_predicate={}, required_guarantees=[{}]", | ||
pre.predicate_expr(), | ||
guarantees.join(", ") | ||
) | ||
}) | ||
.unwrap_or_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #15561 (review) Andrew asked me to keep this, but now since the schema isn't even being passed in to with_predicate
it's going to be hard to keep these. I suggest we just accept that they won't be present in the physical plans. If that's not okay what I could do is generate them on the fly in fmt_extra
or generate them if with_predicate
is called with a schema or with_schema
is called with a predicate. But I'd like to avoid that unless someone thinks is worth it or has another suggestion.
@@ -587,4 +560,49 @@ impl FileSource for ParquetSource { | |||
} | |||
} | |||
} | |||
|
|||
fn try_pushdown_filters( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @berkaysynnada for this implementation
/// Check if the specified file format has support for pushing down the provided filters within | ||
/// the given schemas. Added initially to support the Parquet file format's ability to do this. | ||
fn supports_filters_pushdown( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Binning specialized code that was also leaking parquet stuff through DataSource and into TableProvider 😄
/// An enum to distinguish between different states when determining if certain filters can be | ||
/// pushed down to file scanning | ||
#[derive(Debug, PartialEq)] | ||
pub enum FilePushdownSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another one of these enums!
02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)] | ||
02)--Projection: t_pushdown.a | ||
03)----Filter: t_pushdown.b > Int32(2) | ||
04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because the pushdown no longer happens at the logical level - it happens at the physical level. This makes sense, in part because the checks for suitability of pushdown are better at the physical level (there may be reasons to reject a pushdown at the physical level that are not present at the logical level, e.g. partition columns or encodings).
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] | ||
03)----CoalesceBatchesExec: target_batch_size=8192 | ||
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 | ||
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada any idea why we have extra CoalesceBatchesExec
and RepartitionExec
now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've a guess but not proved: CoalesceBatchesExec comes because of RepartitionExec, and RepartitionExec is inserted to satisfy partition count, which is 4. That's required by FilterExec now (which was pushed down at the logical level before), but that FilterExec is pushed down later after EnforceDistribution.
So, this makes me think about the correct order of physical rules. PushdownFilter should probably work before distribution&order satisfiers. But that could also bring some issues, I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PushdownFilter should probably work before distribution&order satisfiers
That makes sense to me. It does more "invasive" re-arranging of plans than those do.
071aa19
to
ff090a7
Compare
Enjoy your vacation! I think you'll like this diff: ![]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb. I couldn't provide much design suggestions, since I cannot fully understand the need of these changes. If you provide more background information, I can help more maybe.
It seems there are some critical planning changes here, and it's better getting approvals by more people for this PR.
}; | ||
let config_pushdown_enabled = config.execution.parquet.pushdown_filters; | ||
let table_pushdown_enabled = self.pushdown_filters(); | ||
if table_pushdown_enabled || config_pushdown_enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OR'ing this is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behavior is not documented anywhere, I tried to match the existing tests:
# pushdown_filters (currently) defaults to false, but we set it here to be explicit | |
statement ok | |
set datafusion.execution.parquet.pushdown_filters = false; | |
statement ok | |
CREATE EXTERNAL TABLE t(a varchar, b int, c float) STORED AS PARQUET | |
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/'; | |
## Create table with pushdown enabled (pushdown setting is part of the table) | |
statement ok | |
set datafusion.execution.parquet.pushdown_filters = true; | |
## Create table without pushdown | |
statement ok | |
CREATE EXTERNAL TABLE t_pushdown(a varchar, b int, c float) STORED AS PARQUET | |
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/'; | |
# restore defaults | |
statement ok | |
set datafusion.execution.parquet.pushdown_filters = false; | |
# When filter pushdown is not enabled, ParquetExec only filters based on | |
# metadata, so a FilterExec is required to filter the | |
# output of the `ParquetExec` | |
query T | |
select a from t where b > 2 ORDER BY a; | |
---- | |
baz | |
foo | |
NULL | |
NULL | |
NULL | |
query TT | |
EXPLAIN select a from t_pushdown where b > 2 ORDER BY a; | |
---- | |
logical_plan | |
01)Sort: t_pushdown.a ASC NULLS LAST | |
02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)] | |
physical_plan | |
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] | |
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] | |
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] |
let mut conf = self.clone(); | ||
let mut allowed_filters = vec![]; | ||
let mut remaining_filters = vec![]; | ||
for filter in &fd.filters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fd.take_filters()
to avoid clone's below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fd: FilterDescription, | ||
config: &datafusion_common::config::ConfigOptions, | ||
) -> datafusion_common::Result<FilterPushdownResult<Arc<dyn FileSource>>> { | ||
let Some(file_schema) = self.file_schema.clone() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm asking to learn: in which cases ParquetSource doesn't have the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they always end up with a schema now, but the current APIs don't require it via the constructor and instead it gets passed in via FileScanConfigBuilder
. I tried piping it into the constructor but makes things difficult, there's APIs that rely on ParquetSource::default()
and such. So TLDR is it's a bit gross but this is the least chrun way to do it and we can always come back later and clean the rest up.
// if we can't push it down completely with only the filename-based/path-based | ||
// column names, then we should check if we can do parquet predicate pushdown | ||
let supports_pushdown = self.options.format.supports_filters_pushdown( | ||
&self.file_schema, | ||
&self.table_schema, | ||
&[filter], | ||
)?; | ||
|
||
if supports_pushdown == FilePushdownSupport::Supported { | ||
return Ok(TableProviderFilterPushDown::Exact); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one question: aren't we expecting/preparing for, people to use ListingTable if they read Parquet files? Are we eventually planning to remove all format-specific handlings? Or this is a case only for filter pushdown?
// if we can't push it down completely with only the filename-based/path-based | ||
// column names, then we should check if we can do parquet predicate pushdown | ||
let supports_pushdown = self.options.format.supports_filters_pushdown( | ||
&self.file_schema, | ||
&self.table_schema, | ||
&[filter], | ||
)?; | ||
|
||
if supports_pushdown == FilePushdownSupport::Supported { | ||
return Ok(TableProviderFilterPushDown::Exact); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case, why don't we fully remove supports_filters_pushdown()
API at all
/// Optional predicate for row filtering during parquet scan | ||
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>, | ||
/// Optional predicate for pruning row groups (derived from `predicate`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to see these are unifying
/// The schema of the file. | ||
/// In particular, this is the schema of the table without partition columns, | ||
/// *not* the physical schema of the file. | ||
pub(crate) file_schema: Option<SchemaRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also another schema in FileScanConfig. Are they both reflects the file schema, not physical schema? and can we somehow unify them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same schema that FileScanConfig
passes into ParquetSource
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] | ||
03)----CoalesceBatchesExec: target_batch_size=8192 | ||
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 | ||
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've a guess but not proved: CoalesceBatchesExec comes because of RepartitionExec, and RepartitionExec is inserted to satisfy partition count, which is 4. That's required by FilterExec now (which was pushed down at the logical level before), but that FilterExec is pushed down later after EnforceDistribution.
So, this makes me think about the correct order of physical rules. PushdownFilter should probably work before distribution&order satisfiers. But that could also bring some issues, I'm not sure.
#15812 surfaced another reason why building the predicates from the files schemas is necessary. I think once we merge this we can tackle that. |
1af7766
to
3fde445
Compare
Moves predicate pushdown into parquet being something specialized that
ListingTable
and Parquet to working for any TableProvider and any file format the implements the APIs. The checks for compatibility also happen all within the parquet data source machinery, instead of leaking implementations viasupports_filters_pushdown
.