From db6afbd14a90a9eab0f417a03febca6a56d5ac95 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 18 Apr 2025 14:56:48 -0500 Subject: [PATCH 01/15] re-implement filter pushdown for parquet --- .../core/src/datasource/listing/table.rs | 14 +--- .../datasource-parquet/src/file_format.rs | 31 ++------- datafusion/datasource-parquet/src/mod.rs | 6 +- .../datasource-parquet/src/row_filter.rs | 53 +++------------ datafusion/datasource-parquet/src/source.rs | 65 +++++++++++++++++-- datafusion/datasource/src/file_format.rs | 30 +-------- 6 files changed, 77 insertions(+), 122 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a9834da92e5a..8a9f4edb75ab 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -25,7 +25,7 @@ use std::{any::Any, str::FromStr, sync::Arc}; use crate::datasource::{ create_ordering, file_format::{ - file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport, + file_compression_type::FileCompressionType, FileFormat, }, physical_plan::FileSinkConfig, }; @@ -982,18 +982,6 @@ impl TableProvider for ListingTable { return Ok(TableProviderFilterPushDown::Exact); } - // if we can't push it down completely with only the filename-based/path-based - // column names, then we should check if we can do parquet predicate pushdown - let supports_pushdown = self.options.format.supports_filters_pushdown( - &self.file_schema, - &self.table_schema, - &[filter], - )?; - - if supports_pushdown == FilePushdownSupport::Supported { - return Ok(TableProviderFilterPushDown::Exact); - } - Ok(TableProviderFilterPushDown::Inexact) }) .collect() diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index ee4db50a6eda..0dc8433bd7b8 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -30,7 +30,7 @@ use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer}; use datafusion_datasource::file_format::{ - FileFormat, FileFormatFactory, FilePushdownSupport, + FileFormat, FileFormatFactory }; use datafusion_datasource::write::demux::DemuxedStreamReceiver; @@ -52,7 +52,6 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; -use datafusion_expr::Expr; use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexRequirement; @@ -60,7 +59,6 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use crate::can_expr_be_pushed_down_with_schemas; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -428,10 +426,10 @@ impl FileFormat for ParquetFormat { metadata_size_hint = Some(metadata); } - let mut source = ParquetSource::new(self.options.clone()); + let mut source = ParquetSource::new(self.options.clone(), Arc::clone(&conf.file_schema)); if let Some(predicate) = predicate { - source = source.with_predicate(Arc::clone(&conf.file_schema), predicate); + source = source.with_predicate(predicate); } if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) @@ -459,29 +457,8 @@ impl FileFormat for ParquetFormat { Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) } - fn supports_filters_pushdown( - &self, - file_schema: &Schema, - table_schema: &Schema, - filters: &[&Expr], - ) -> Result { - if !self.options().global.pushdown_filters { - return Ok(FilePushdownSupport::NoSupport); - } - - let all_supported = filters.iter().all(|filter| { - can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema) - }); - - Ok(if all_supported { - FilePushdownSupport::Supported - } else { - FilePushdownSupport::NotSupportedForFilter - }) - } - fn file_source(&self) -> Arc { - Arc::new(ParquetSource::default()) + todo!() // need access of file schema? } } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 516b13792189..2bbb66f38255 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -59,7 +59,6 @@ pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; pub use row_filter::build_row_filter; -pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; use source::ParquetSource; pub use writer::plan_to_parquet; @@ -221,10 +220,9 @@ impl ParquetExecBuilder { parquet_file_reader_factory, schema_adapter_factory, } = self; - let mut parquet = ParquetSource::new(table_parquet_options); + let mut parquet = ParquetSource::new(table_parquet_options, Arc::clone(&file_scan_config.file_schema)); if let Some(predicate) = predicate.clone() { - parquet = parquet - .with_predicate(Arc::clone(&file_scan_config.file_schema), predicate); + parquet = parquet.with_predicate(predicate); } if let Some(metadata_size_hint) = metadata_size_hint { parquet = parquet.with_metadata_size_hint(metadata_size_hint) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2d2993c29a6f..f7d12606a874 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -386,21 +386,21 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo /// this expression from being predicate pushed down. If any of them would, this returns false. /// Otherwise, true. pub fn can_expr_be_pushed_down_with_schemas( - expr: &datafusion_expr::Expr, - _file_schema: &Schema, + expr: &Arc, table_schema: &Schema, ) -> bool { let mut can_be_pushed = true; - expr.apply(|expr| match expr { - datafusion_expr::Expr::Column(column) => { + expr.apply(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() { can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema); Ok(if can_be_pushed { TreeNodeRecursion::Jump } else { TreeNodeRecursion::Stop }) + } else { + Ok(TreeNodeRecursion::Continue) } - _ => Ok(TreeNodeRecursion::Continue), }) .unwrap(); // we never return an Err, so we can safely unwrap this can_be_pushed @@ -516,7 +516,7 @@ mod test { use super::*; use datafusion_common::ScalarValue; - use arrow::datatypes::{Field, Fields, TimeUnit::Nanosecond}; + use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; @@ -651,71 +651,38 @@ mod test { fn nested_data_structures_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new( - "list_col", - DataType::Struct(Fields::empty()), - true, - )]); - let expr = col("list_col").is_not_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } #[test] fn projected_columns_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let file_schema = - Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]); - let expr = col("nonexistent_column").is_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } #[test] fn basic_expr_doesnt_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let file_schema = - Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); - let expr = col("string_col").is_null(); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } #[test] fn complex_expr_doesnt_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![ - Field::new("string_col", DataType::Utf8, true), - Field::new("bigint_col", DataType::Int64, true), - ]); - let expr = col("string_col") .is_not_null() .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5))))); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } fn get_basic_table_schema() -> Schema { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index e15f5243cd27..998dd9de35cb 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -25,6 +25,7 @@ use crate::opener::build_page_pruning_predicate; use crate::opener::build_pruning_predicate; use crate::opener::ParquetOpener; use crate::page_filter::PagePruningAccessPlanFilter; +use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; use datafusion_datasource::file_stream::FileOpener; @@ -37,9 +38,13 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_physical_plan::filter_pushdown::FilterDescription; +use datafusion_physical_plan::filter_pushdown::FilterPushdownResult; +use datafusion_physical_plan::filter_pushdown::FilterPushdownSupport; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_physical_plan::DisplayFormatType; @@ -253,12 +258,16 @@ use object_store::ObjectStore; /// [`RecordBatch`]: arrow::record_batch::RecordBatch /// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData -#[derive(Clone, Default, Debug)] +#[derive(Clone, Debug)] pub struct ParquetSource { /// Options for reading Parquet files pub(crate) table_parquet_options: TableParquetOptions, /// Optional metrics pub(crate) metrics: ExecutionPlanMetricsSet, + /// The schema of the file. + /// In particular, this is the schema of the table without partition columns, + /// *not* the physical schema of the file. + pub(crate) file_schema: SchemaRef, /// Optional predicate for row filtering during parquet scan pub(crate) predicate: Option>, /// Optional predicate for pruning row groups (derived from `predicate`) @@ -280,10 +289,19 @@ impl ParquetSource { /// Create a new ParquetSource to read the data specified in the file scan /// configuration with the provided `TableParquetOptions`. /// if default values are going to be used, use `ParguetConfig::default()` instead - pub fn new(table_parquet_options: TableParquetOptions) -> Self { + pub fn new(table_parquet_options: TableParquetOptions, file_schema: SchemaRef) -> Self { Self { table_parquet_options, - ..Self::default() + file_schema, + metrics: ExecutionPlanMetricsSet::default(), + predicate: None, + pruning_predicate: None, + page_pruning_predicate: None, + parquet_file_reader_factory: None, + schema_adapter_factory: None, + batch_size: None, + metadata_size_hint: None, + projected_statistics: None, } } @@ -306,7 +324,6 @@ impl ParquetSource { /// Set predicate information, also sets pruning_predicate and page_pruning_predicate attributes pub fn with_predicate( &self, - file_schema: Arc, predicate: Arc, ) -> Self { let mut conf = self.clone(); @@ -319,9 +336,9 @@ impl ParquetSource { conf.predicate = Some(Arc::clone(&predicate)); conf.page_pruning_predicate = - Some(build_page_pruning_predicate(&predicate, &file_schema)); + Some(build_page_pruning_predicate(&predicate, &self.file_schema)); conf.pruning_predicate = - build_pruning_predicate(predicate, &file_schema, &predicate_creation_errors); + build_pruning_predicate(predicate, &self.file_schema, &predicate_creation_errors); conf } @@ -589,4 +606,40 @@ impl FileSource for ParquetSource { } } } + + fn try_pushdown_filters( + &self, + fd: FilterDescription, + config: &datafusion_common::config::ConfigOptions, + ) -> datafusion_common::Result>> { + let mut conf = self.clone(); + let filters = fd.filters.clone(); + let predicate = match conf.predicate { + Some(predicate) => conjunction(std::iter::once(predicate).chain(filters.into_iter())), + None => conjunction(filters.into_iter()), + }; + conf.predicate = Some(predicate); + let remaining_description = if config.execution.parquet.pushdown_filters { + let mut remaining_filters = fd.filters.clone(); + for filter in &remaining_filters { + if can_expr_be_pushed_down_with_schemas(filter, &conf.file_schema) { + // This filter can be pushed down + } else { + // This filter cannot be pushed down + remaining_filters.retain(|f| f != filter); + } + } + } else { + FilterDescription { + filters: fd.filters, + } + }; + Ok( + FilterPushdownResult { + support: FilterPushdownSupport::Supported { child_descriptions: vec![], op: Arc::new(conf), revisit: false }, + remaining_description, + } + ) + + } } diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs index 0e0b7b12e16a..879d62db8b7b 100644 --- a/datafusion/datasource/src/file_format.rs +++ b/datafusion/datasource/src/file_format.rs @@ -28,10 +28,9 @@ use crate::file_compression_type::FileCompressionType; use crate::file_scan_config::FileScanConfig; use crate::file_sink_config::FileSinkConfig; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::SchemaRef; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics}; -use datafusion_expr::Expr; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; @@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug { not_impl_err!("Writer not implemented for this format") } - /// Check if the specified file format has support for pushing down the provided filters within - /// the given schemas. Added initially to support the Parquet file format's ability to do this. - fn supports_filters_pushdown( - &self, - _file_schema: &Schema, - _table_schema: &Schema, - _filters: &[&Expr], - ) -> Result { - Ok(FilePushdownSupport::NoSupport) - } - /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc. fn file_source(&self) -> Arc; } -/// An enum to distinguish between different states when determining if certain filters can be -/// pushed down to file scanning -#[derive(Debug, PartialEq)] -pub enum FilePushdownSupport { - /// The file format/system being asked does not support any sort of pushdown. This should be - /// used even if the file format theoretically supports some sort of pushdown, but it's not - /// enabled or implemented yet. - NoSupport, - /// The file format/system being asked *does* support pushdown, but it can't make it work for - /// the provided filter/expression - NotSupportedForFilter, - /// The file format/system being asked *does* support pushdown and *can* make it work for the - /// provided filter/expression - Supported, -} - /// Factory for creating [`FileFormat`] instances based on session and command level options /// /// Users can provide their own `FileFormatFactory` to support arbitrary file formats From bf06d578dca0e8e6fc5b65977cd2e3beea74cefb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 08:23:14 -0500 Subject: [PATCH 02/15] resolve --- .../examples/advanced_parquet_index.rs | 2 +- datafusion-examples/examples/parquet_index.rs | 3 +- .../core/src/datasource/listing/table.rs | 4 +- .../src/datasource/physical_plan/parquet.rs | 9 +- datafusion/core/src/test_util/parquet.rs | 10 +- datafusion/core/tests/fuzz_cases/pruning.rs | 2 +- .../tests/parquet/external_access_plan.rs | 2 +- datafusion/core/tests/parquet/page_pruning.rs | 2 +- .../physical_optimizer/push_down_filter.rs | 2 +- .../datasource-parquet/src/file_format.rs | 8 +- datafusion/datasource-parquet/src/mod.rs | 4 +- .../datasource-parquet/src/row_filter.rs | 4 + datafusion/datasource-parquet/src/source.rs | 125 ++++++------------ datafusion/datasource/src/file_scan_config.rs | 5 +- datafusion/proto/src/physical_plan/mod.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 10 +- 16 files changed, 78 insertions(+), 116 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 03ef3d66f9d7..efaee23366a1 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -495,7 +495,7 @@ impl TableProvider for IndexTableProvider { ParquetSource::default() // provide the predicate so the DataSourceExec can try and prune // row groups internally - .with_predicate(Arc::clone(&schema), predicate) + .with_predicate(predicate) // provide the factory to create parquet reader without re-reading metadata .with_parquet_file_reader_factory(Arc::new(reader_factory)), ); diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 7d6ce4d86af1..c19fc2561d5f 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -242,8 +242,7 @@ impl TableProvider for IndexTableProvider { let files = self.index.get_files(predicate.clone())?; let object_store_url = ObjectStoreUrl::parse("file://")?; - let source = - Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate)); + let source = Arc::new(ParquetSource::default().with_predicate(predicate)); let mut file_scan_config_builder = FileScanConfigBuilder::new(object_store_url, self.schema(), source) .with_projection(projection.cloned()) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 8a9f4edb75ab..c11bb4523c65 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -24,9 +24,7 @@ use std::{any::Any, str::FromStr, sync::Arc}; use crate::datasource::{ create_ordering, - file_format::{ - file_compression_type::FileCompressionType, FileFormat, - }, + file_format::{file_compression_type::FileCompressionType, FileFormat}, physical_plan::FileSinkConfig, }; use crate::execution::context::SessionState; diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index e9bb8b0db368..e4d5060e065c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -54,6 +54,7 @@ mod tests { use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; + use datafusion_datasource::file::FileSource; use datafusion_datasource::{FileRange, PartitionedFile}; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_datasource_parquet::{ @@ -139,7 +140,7 @@ mod tests { self.round_trip(batches).await.batches } - fn build_file_source(&self, file_schema: SchemaRef) -> Arc { + fn build_file_source(&self, file_schema: SchemaRef) -> Arc { // set up predicate (this is normally done by a layer higher up) let predicate = self .predicate @@ -148,7 +149,7 @@ mod tests { let mut source = ParquetSource::default(); if let Some(predicate) = predicate { - source = source.with_predicate(Arc::clone(&file_schema), predicate); + source = source.with_predicate(predicate); } if self.pushdown_predicate { @@ -161,14 +162,14 @@ mod tests { source = source.with_enable_page_index(true); } - Arc::new(source) + source.with_schema(Arc::clone(&file_schema)) } fn build_parquet_exec( &self, file_schema: SchemaRef, file_group: FileGroup, - source: Arc, + source: Arc, ) -> Arc { let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index f5753af64d93..511f378f42e2 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; use crate::prelude::{Expr, SessionConfig, SessionContext}; +use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use object_store::path::Path; @@ -182,10 +183,11 @@ impl TestParquetFile { let physical_filter_expr = create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; - let source = Arc::new(ParquetSource::new(parquet_options).with_predicate( - Arc::clone(&self.schema), - Arc::clone(&physical_filter_expr), - )); + let source = Arc::new( + ParquetSource::new(parquet_options) + .with_predicate(Arc::clone(&physical_filter_expr)), + ) + .with_schema(Arc::clone(&self.schema)); let config = scan_config_builder.with_source(source).build(); let parquet_exec = DataSourceExec::from_data_source(config); diff --git a/datafusion/core/tests/fuzz_cases/pruning.rs b/datafusion/core/tests/fuzz_cases/pruning.rs index 11dd961a54ee..5202d8e5f452 100644 --- a/datafusion/core/tests/fuzz_cases/pruning.rs +++ b/datafusion/core/tests/fuzz_cases/pruning.rs @@ -276,7 +276,7 @@ async fn execute_with_predicate( ctx: &SessionContext, ) -> Vec { let parquet_source = if prune_stats { - ParquetSource::default().with_predicate(Arc::clone(&schema), predicate.clone()) + ParquetSource::default().with_predicate(predicate.clone()) } else { ParquetSource::default() }; diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index bbef073345b7..a5397c5a397c 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -346,7 +346,7 @@ impl TestFull { let source = if let Some(predicate) = predicate { let df_schema = DFSchema::try_from(schema.clone())?; let predicate = ctx.create_physical_expr(predicate, &df_schema)?; - Arc::new(ParquetSource::default().with_predicate(schema.clone(), predicate)) + Arc::new(ParquetSource::default().with_predicate(predicate)) } else { Arc::new(ParquetSource::default()) }; diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index f693485cbe01..9da879a32f6b 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -77,7 +77,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec let source = Arc::new( ParquetSource::default() - .with_predicate(Arc::clone(&schema), predicate) + .with_predicate(predicate) .with_enable_page_index(true), ); let base_config = FileScanConfigBuilder::new(object_store_url, schema, source) diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index b19144f1bcff..d871a440417f 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -99,7 +99,7 @@ impl FileSource for TestSource { } fn with_schema(&self, _schema: SchemaRef) -> Arc { - todo!("should not be called") + Arc::new(self.clone()) as Arc } fn with_projection(&self, _config: &FileScanConfig) -> Arc { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 0dc8433bd7b8..27b85d427245 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -29,9 +29,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer}; -use datafusion_datasource::file_format::{ - FileFormat, FileFormatFactory -}; +use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use arrow::compute::sum; @@ -426,7 +424,7 @@ impl FileFormat for ParquetFormat { metadata_size_hint = Some(metadata); } - let mut source = ParquetSource::new(self.options.clone(), Arc::clone(&conf.file_schema)); + let mut source = ParquetSource::new(self.options.clone()); if let Some(predicate) = predicate { source = source.with_predicate(predicate); @@ -458,7 +456,7 @@ impl FileFormat for ParquetFormat { } fn file_source(&self) -> Arc { - todo!() // need access of file schema? + Arc::new(ParquetSource::default()) } } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 2bbb66f38255..aed0d7f27561 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -220,7 +220,7 @@ impl ParquetExecBuilder { parquet_file_reader_factory, schema_adapter_factory, } = self; - let mut parquet = ParquetSource::new(table_parquet_options, Arc::clone(&file_scan_config.file_schema)); + let mut parquet = ParquetSource::new(table_parquet_options); if let Some(predicate) = predicate.clone() { parquet = parquet.with_predicate(predicate); } @@ -242,7 +242,7 @@ impl ParquetExecBuilder { inner: DataSourceExec::new(Arc::new(base_config.clone())), base_config, predicate, - pruning_predicate: parquet.pruning_predicate, + pruning_predicate: None, // for backwards compat since `ParquetExec` is only for backwards compat anyway schema_adapter_factory: parquet.schema_adapter_factory, parquet_file_reader_factory: parquet.parquet_file_reader_factory, table_parquet_options: parquet.table_parquet_options, diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index f7d12606a874..61551c4a8fc9 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -652,6 +652,7 @@ mod test { let table_schema = get_basic_table_schema(); let expr = col("list_col").is_not_null(); + let expr = logical2physical(&expr, &table_schema); assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } @@ -661,6 +662,7 @@ mod test { let table_schema = get_basic_table_schema(); let expr = col("nonexistent_column").is_null(); + let expr = logical2physical(&expr, &table_schema); assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } @@ -670,6 +672,7 @@ mod test { let table_schema = get_basic_table_schema(); let expr = col("string_col").is_null(); + let expr = logical2physical(&expr, &table_schema); assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } @@ -681,6 +684,7 @@ mod test { let expr = col("string_col") .is_not_null() .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5))))); + let expr = logical2physical(&expr, &table_schema); assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 998dd9de35cb..53b69644aa48 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -21,10 +21,7 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use crate::opener::build_page_pruning_predicate; -use crate::opener::build_pruning_predicate; use crate::opener::ParquetOpener; -use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; @@ -33,7 +30,7 @@ use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; -use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; +use arrow::datatypes::{SchemaRef, TimeUnit}; use datafusion_common::config::TableParquetOptions; use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; @@ -41,14 +38,13 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_physical_plan::filter_pushdown::filter_pushdown_not_supported; use datafusion_physical_plan::filter_pushdown::FilterDescription; use datafusion_physical_plan::filter_pushdown::FilterPushdownResult; use datafusion_physical_plan::filter_pushdown::FilterPushdownSupport; -use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; -use itertools::Itertools; use object_store::ObjectStore; /// Execution plan for reading one or more Parquet files. @@ -258,7 +254,7 @@ use object_store::ObjectStore; /// [`RecordBatch`]: arrow::record_batch::RecordBatch /// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct ParquetSource { /// Options for reading Parquet files pub(crate) table_parquet_options: TableParquetOptions, @@ -267,13 +263,9 @@ pub struct ParquetSource { /// The schema of the file. /// In particular, this is the schema of the table without partition columns, /// *not* the physical schema of the file. - pub(crate) file_schema: SchemaRef, + pub(crate) file_schema: Option, /// Optional predicate for row filtering during parquet scan pub(crate) predicate: Option>, - /// Optional predicate for pruning row groups (derived from `predicate`) - pub(crate) pruning_predicate: Option>, - /// Optional predicate for pruning pages (derived from `predicate`) - pub(crate) page_pruning_predicate: Option>, /// Optional user defined parquet file reader factory pub(crate) parquet_file_reader_factory: Option>, /// Optional user defined schema adapter @@ -289,19 +281,10 @@ impl ParquetSource { /// Create a new ParquetSource to read the data specified in the file scan /// configuration with the provided `TableParquetOptions`. /// if default values are going to be used, use `ParguetConfig::default()` instead - pub fn new(table_parquet_options: TableParquetOptions, file_schema: SchemaRef) -> Self { + pub fn new(table_parquet_options: TableParquetOptions) -> Self { Self { table_parquet_options, - file_schema, - metrics: ExecutionPlanMetricsSet::default(), - predicate: None, - pruning_predicate: None, - page_pruning_predicate: None, - parquet_file_reader_factory: None, - schema_adapter_factory: None, - batch_size: None, - metadata_size_hint: None, - projected_statistics: None, + ..Self::default() } } @@ -321,25 +304,12 @@ impl ParquetSource { self } - /// Set predicate information, also sets pruning_predicate and page_pruning_predicate attributes - pub fn with_predicate( - &self, - predicate: Arc, - ) -> Self { + /// Set predicate information + pub fn with_predicate(&self, predicate: Arc) -> Self { let mut conf = self.clone(); - let metrics = ExecutionPlanMetricsSet::new(); - let predicate_creation_errors = - MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors"); - conf = conf.with_metrics(metrics); conf.predicate = Some(Arc::clone(&predicate)); - - conf.page_pruning_predicate = - Some(build_page_pruning_predicate(&predicate, &self.file_schema)); - conf.pruning_predicate = - build_pruning_predicate(predicate, &self.file_schema, &predicate_creation_errors); - conf } @@ -532,8 +502,11 @@ impl FileSource for ParquetSource { Arc::new(conf) } - fn with_schema(&self, _schema: SchemaRef) -> Arc { - Arc::new(Self { ..self.clone() }) + fn with_schema(&self, schema: SchemaRef) -> Arc { + Arc::new(Self { + file_schema: Some(schema), + ..self.clone() + }) } fn with_statistics(&self, statistics: Statistics) -> Arc { @@ -578,25 +551,8 @@ impl FileSource for ParquetSource { .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); - let pruning_predicate_string = self - .pruning_predicate - .as_ref() - .map(|pre| { - let mut guarantees = pre - .literal_guarantees() - .iter() - .map(|item| format!("{}", item)) - .collect_vec(); - guarantees.sort(); - format!( - ", pruning_predicate={}, required_guarantees=[{}]", - pre.predicate_expr(), - guarantees.join(", ") - ) - }) - .unwrap_or_default(); - write!(f, "{}{}", predicate_string, pruning_predicate_string) + write!(f, "{}", predicate_string) } DisplayFormatType::TreeRender => { if let Some(predicate) = self.predicate() { @@ -612,34 +568,41 @@ impl FileSource for ParquetSource { fd: FilterDescription, config: &datafusion_common::config::ConfigOptions, ) -> datafusion_common::Result>> { - let mut conf = self.clone(); - let filters = fd.filters.clone(); - let predicate = match conf.predicate { - Some(predicate) => conjunction(std::iter::once(predicate).chain(filters.into_iter())), - None => conjunction(filters.into_iter()), + let Some(file_schema) = self.file_schema.clone() else { + return Ok(filter_pushdown_not_supported(fd)); }; - conf.predicate = Some(predicate); - let remaining_description = if config.execution.parquet.pushdown_filters { - let mut remaining_filters = fd.filters.clone(); - for filter in &remaining_filters { - if can_expr_be_pushed_down_with_schemas(filter, &conf.file_schema) { + if config.execution.parquet.pushdown_filters { + let mut conf = self.clone(); + let mut allowed_filters = vec![]; + let mut remaining_filters = vec![]; + for filter in &fd.filters { + if can_expr_be_pushed_down_with_schemas(filter, &file_schema) { // This filter can be pushed down + allowed_filters.push(filter.clone()); } else { // This filter cannot be pushed down - remaining_filters.retain(|f| f != filter); + remaining_filters.push(filter.clone()); } } + let predicate = match conf.predicate { + Some(predicate) => conjunction( + std::iter::once(predicate).chain(allowed_filters.into_iter()), + ), + None => conjunction(allowed_filters.into_iter()), + }; + conf.predicate = Some(predicate); + Ok(FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions: vec![], + op: Arc::new(conf), + revisit: false, + }, + remaining_description: FilterDescription { + filters: remaining_filters, + }, + }) } else { - FilterDescription { - filters: fd.filters, - } - }; - Ok( - FilterPushdownResult { - support: FilterPushdownSupport::Supported { child_descriptions: vec![], op: Arc::new(conf), revisit: false }, - remaining_description, - } - ) - + Ok(filter_pushdown_not_supported(fd)) + } } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fb756cc11fbb..e09c8b39be40 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -407,7 +407,9 @@ impl FileScanConfigBuilder { let statistics = statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema)); - let file_source = file_source.with_statistics(statistics.clone()); + let file_source = file_source + .with_statistics(statistics.clone()) + .with_schema(Arc::clone(&file_schema)); let file_compression_type = file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); @@ -463,7 +465,6 @@ impl DataSource for FileScanConfig { let source = self .file_source .with_batch_size(batch_size) - .with_schema(Arc::clone(&self.file_schema)) .with_projection(self); let opener = source.create_file_opener(object_store, self, partition); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 90d071ab23f5..a46c6d833687 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -728,7 +728,7 @@ impl protobuf::PhysicalPlanNode { let mut source = ParquetSource::new(options); if let Some(predicate) = predicate { - source = source.with_predicate(Arc::clone(&schema), predicate); + source = source.with_predicate(predicate); } let base_config = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index be90497a6e21..27eed4d0717e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -739,9 +739,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { let mut options = TableParquetOptions::new(); options.global.pushdown_filters = true; - let file_source = Arc::new( - ParquetSource::new(options).with_predicate(Arc::clone(&file_schema), predicate), - ); + let file_source = Arc::new(ParquetSource::new(options).with_predicate(predicate)); let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), @@ -800,10 +798,8 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { inner: Arc::new(Column::new("col", 1)), }); - let file_source = Arc::new( - ParquetSource::default() - .with_predicate(Arc::clone(&file_schema), custom_predicate_expr), - ); + let file_source = + Arc::new(ParquetSource::default().with_predicate(custom_predicate_expr)); let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), From a1d344162b86e633875125cfc85e19e342d77c39 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 08:30:46 -0500 Subject: [PATCH 03/15] tweak --- datafusion/datasource-parquet/src/source.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 53b69644aa48..b29df2a40324 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -586,9 +586,9 @@ impl FileSource for ParquetSource { } let predicate = match conf.predicate { Some(predicate) => conjunction( - std::iter::once(predicate).chain(allowed_filters.into_iter()), + std::iter::once(predicate).chain(allowed_filters), ), - None => conjunction(allowed_filters.into_iter()), + None => conjunction(allowed_filters), }; conf.predicate = Some(predicate); Ok(FilterPushdownResult { From 34723b025ea623943cbebaa59fbec2cce44e021c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 09:11:44 -0500 Subject: [PATCH 04/15] fix tests --- .../datasource-parquet/src/row_filter.rs | 32 ++++++++++++++++--- datafusion/datasource-parquet/src/source.rs | 6 ++-- datafusion/datasource/src/file_scan_config.rs | 3 +- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 61551c4a8fc9..dc1a08b3d165 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -649,10 +649,11 @@ mod test { #[test] fn nested_data_structures_prevent_pushdown() { - let table_schema = get_basic_table_schema(); + let table_schema = Arc::new(get_lists_table_schema()); - let expr = col("list_col").is_not_null(); + let expr = col("utf8_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); + check_expression_can_evaluate_against_schema(&expr, &table_schema); assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } @@ -661,8 +662,8 @@ mod test { fn projected_columns_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let expr = col("nonexistent_column").is_null(); - let expr = logical2physical(&expr, &table_schema); + let expr = + Arc::new(Column::new("nonexistent_column", 0)) as Arc; assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } @@ -701,4 +702,27 @@ mod test { parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) .expect("parsing schema") } + + fn get_lists_table_schema() -> Schema { + let testdata = datafusion_common::test_util::parquet_test_data(); + let file = std::fs::File::open(format!("{testdata}/list_columns.parquet")) + .expect("opening file"); + + let reader = SerializedFileReader::new(file).expect("creating reader"); + + let metadata = reader.metadata(); + + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("parsing schema") + } + + /// Sanity check that the given expression could be evaluated against the given schema without any errors. + /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc. + fn check_expression_can_evaluate_against_schema( + expr: &Arc, + table_schema: &Arc, + ) -> bool { + let batch = RecordBatch::new_empty(Arc::clone(table_schema)); + expr.evaluate(&batch).is_ok() + } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index b29df2a40324..242389136472 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -585,9 +585,9 @@ impl FileSource for ParquetSource { } } let predicate = match conf.predicate { - Some(predicate) => conjunction( - std::iter::once(predicate).chain(allowed_filters), - ), + Some(predicate) => { + conjunction(std::iter::once(predicate).chain(allowed_filters)) + } None => conjunction(allowed_filters), }; conf.predicate = Some(predicate); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e09c8b39be40..650e0116de17 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -92,6 +92,7 @@ use log::{debug, warn}; /// # Field::new("c4", DataType::Int32, false), /// # ])); /// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate +/// # #[derive(Clone)] /// # struct ParquetSource { /// # projected_statistics: Option /// # }; @@ -99,7 +100,7 @@ use log::{debug, warn}; /// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize) -> Arc { unimplemented!() } /// # fn as_any(&self) -> &dyn Any { self } /// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } -/// # fn with_schema(&self, _: SchemaRef) -> Arc { unimplemented!() } +/// # fn with_schema(&self, _: SchemaRef) -> Arc { Arc::new(self.clone()) as Arc } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc { unimplemented!() } /// # fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self {projected_statistics: Some(statistics)} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } From 2efd98fb50564edb600247298e0ddc0bf5d28260 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 10:01:33 -0500 Subject: [PATCH 05/15] update --- .../sqllogictest/test_files/parquet.slt | 2 +- .../test_files/parquet_filter_pushdown.slt | 29 ++++++++++++----- .../test_files/push_down_filter.slt | 31 ++++++++++++++----- .../test_files/regexp/regexp_count.slt | 2 +- .../test_files/regexp/regexp_like.slt | 2 +- .../test_files/regexp/regexp_match.slt | 2 +- .../test_files/regexp/regexp_replace.slt | 2 +- .../test_files/repartition_scan.slt | 8 ++--- 8 files changed, 54 insertions(+), 24 deletions(-) diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 0823a9218268..0d94ec6d87df 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -625,7 +625,7 @@ physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 LIKE f% 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f% statement ok drop table foo diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 758113b70835..ed74dc94be3b 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -81,11 +81,16 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a; ---- logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST -02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)] +02)--Projection: t_pushdown.a +03)----Filter: t_pushdown.b > Int32(2) +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------FilterExec: b@1 > 2, projection=[a@0] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 # When filter pushdown *is* enabled, ParquetExec can filter exactly, @@ -113,7 +118,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: b@1 > 2, projection=[a@0] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 # also test querying on columns that are not in all the files query T @@ -127,11 +132,16 @@ EXPLAIN select a from t_pushdown where b > 2 AND a IS NOT NULL order by a; ---- logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST -02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL] +02)--Projection: t_pushdown.a +03)----Filter: t_pushdown.b > Int32(2) AND t_pushdown.a IS NOT NULL +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------FilterExec: b@1 > 2 AND a@0 IS NOT NULL, projection=[a@0] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL query I @@ -144,11 +154,16 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by b; ---- logical_plan 01)Sort: t_pushdown.b ASC NULLS LAST -02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8("bar")] +02)--Projection: t_pushdown.b +03)----Filter: t_pushdown.a = Utf8("bar") +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.a = Utf8("bar")] physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------FilterExec: a@0 = bar, projection=[b@1] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=a@0 = bar ## cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index 67965146e76b..15e7f3c92cd5 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -181,7 +181,8 @@ explain select * from test_filter_with_limit where value = 2 limit 1; ---- logical_plan 01)Limit: skip=0, fetch=1 -02)--TableScan: test_filter_with_limit projection=[part_key, value], full_filters=[test_filter_with_limit.value = Int32(2)], fetch=1 +02)--Filter: test_filter_with_limit.value = Int32(2) +03)----TableScan: test_filter_with_limit projection=[part_key, value], partial_filters=[test_filter_with_limit.value = Int32(2)] query II select * from test_filter_with_limit where value = 2 limit 1; @@ -218,43 +219,57 @@ LOCATION 'test_files/scratch/push_down_filter/t.parquet'; query TT explain select a from t where a = '100'; ---- -logical_plan TableScan: t projection=[a], full_filters=[t.a = Int32(100)] +logical_plan +01)Filter: t.a = Int32(100) +02)--TableScan: t projection=[a], partial_filters=[t.a = Int32(100)] # The predicate should not have a column cast when the value is a valid i32 query TT explain select a from t where a != '100'; ---- -logical_plan TableScan: t projection=[a], full_filters=[t.a != Int32(100)] +logical_plan +01)Filter: t.a != Int32(100) +02)--TableScan: t projection=[a], partial_filters=[t.a != Int32(100)] # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = '99999999999'; ---- -logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("99999999999")] +logical_plan +01)Filter: CAST(t.a AS Utf8) = Utf8("99999999999") +02)--TableScan: t projection=[a], partial_filters=[CAST(t.a AS Utf8) = Utf8("99999999999")] # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = '99.99'; ---- -logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("99.99")] +logical_plan +01)Filter: CAST(t.a AS Utf8) = Utf8("99.99") +02)--TableScan: t projection=[a], partial_filters=[CAST(t.a AS Utf8) = Utf8("99.99")] # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = ''; ---- -logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("")] +logical_plan +01)Filter: CAST(t.a AS Utf8) = Utf8("") +02)--TableScan: t projection=[a], partial_filters=[CAST(t.a AS Utf8) = Utf8("")] # The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information. query TT explain select a from t where cast(a as string) = '100'; ---- -logical_plan TableScan: t projection=[a], full_filters=[t.a = Int32(100)] +logical_plan +01)Filter: t.a = Int32(100) +02)--TableScan: t projection=[a], partial_filters=[t.a = Int32(100)] # The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost). query TT explain select a from t where CAST(a AS string) = '0123'; ---- -logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("0123")] +logical_plan +01)Filter: CAST(t.a AS Utf8) = Utf8("0123") +02)--TableScan: t projection=[a], partial_filters=[CAST(t.a AS Utf8) = Utf8("0123")] statement ok diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_count.slt b/datafusion/sqllogictest/test_files/regexp/regexp_count.slt index f64705429bfa..d842a1ee81df 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_count.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_count.slt @@ -341,4 +341,4 @@ statement ok drop table t_stringview; statement ok -drop table empty_table; \ No newline at end of file +drop table empty_table; diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_like.slt b/datafusion/sqllogictest/test_files/regexp/regexp_like.slt index ec48d62499c8..8c407ea2e760 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_like.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_like.slt @@ -277,4 +277,4 @@ statement ok drop table strings statement ok -drop table dict_table \ No newline at end of file +drop table dict_table diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_match.slt b/datafusion/sqllogictest/test_files/regexp/regexp_match.slt index 4b4cf4f134d8..e79af4774aa2 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_match.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_match.slt @@ -198,4 +198,4 @@ NULL query B select null !~* 'abc'; ---- -NULL \ No newline at end of file +NULL diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt b/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt index d54261f02b81..a16801adcef7 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt @@ -126,4 +126,4 @@ select from (values ('a'), ('b')) as tbl(col); ---- NULL NULL NULL -NULL NULL NULL \ No newline at end of file +NULL NULL NULL diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 2b30de572c8c..d4eac3045572 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -61,7 +61,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 # disable round robin repartitioning statement ok @@ -77,7 +77,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 # enable round robin repartitioning again statement ok @@ -102,7 +102,7 @@ physical_plan 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: column1@0 != 42 -05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42 ## Read the files as though they are ordered @@ -138,7 +138,7 @@ physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----FilterExec: column1@0 != 42 -04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42 # Cleanup statement ok From 7a45c2cc260695a64830caed8e32893e1048f97e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 19:52:05 -0500 Subject: [PATCH 06/15] revert unecessary slt updates --- datafusion/sqllogictest/test_files/regexp/regexp_count.slt | 2 +- datafusion/sqllogictest/test_files/regexp/regexp_like.slt | 2 +- datafusion/sqllogictest/test_files/regexp/regexp_match.slt | 2 +- datafusion/sqllogictest/test_files/regexp/regexp_replace.slt | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_count.slt b/datafusion/sqllogictest/test_files/regexp/regexp_count.slt index d842a1ee81df..f64705429bfa 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_count.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_count.slt @@ -341,4 +341,4 @@ statement ok drop table t_stringview; statement ok -drop table empty_table; +drop table empty_table; \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_like.slt b/datafusion/sqllogictest/test_files/regexp/regexp_like.slt index 8c407ea2e760..ec48d62499c8 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_like.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_like.slt @@ -277,4 +277,4 @@ statement ok drop table strings statement ok -drop table dict_table +drop table dict_table \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_match.slt b/datafusion/sqllogictest/test_files/regexp/regexp_match.slt index e79af4774aa2..4b4cf4f134d8 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_match.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_match.slt @@ -198,4 +198,4 @@ NULL query B select null !~* 'abc'; ---- -NULL +NULL \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt b/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt index a16801adcef7..d54261f02b81 100644 --- a/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt +++ b/datafusion/sqllogictest/test_files/regexp/regexp_replace.slt @@ -126,4 +126,4 @@ select from (values ('a'), ('b')) as tbl(col); ---- NULL NULL NULL -NULL NULL NULL +NULL NULL NULL \ No newline at end of file From 8bf42e112ff2086dfe06f215a92294995dd15ef1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 19:53:49 -0500 Subject: [PATCH 07/15] fix lint --- datafusion/datasource-parquet/src/source.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 242389136472..6139cccee36f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -578,10 +578,10 @@ impl FileSource for ParquetSource { for filter in &fd.filters { if can_expr_be_pushed_down_with_schemas(filter, &file_schema) { // This filter can be pushed down - allowed_filters.push(filter.clone()); + allowed_filters.push(Arc::clone(&filter)); } else { // This filter cannot be pushed down - remaining_filters.push(filter.clone()); + remaining_filters.push(Arc::clone(&filter)); } } let predicate = match conf.predicate { From 7dd399e10c1ae9907bc07a106b33b66ac90fc258 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 20:16:46 -0500 Subject: [PATCH 08/15] re-generate; respect table option --- datafusion/datasource-parquet/src/source.rs | 4 +++- .../test_files/parquet_filter_pushdown.slt | 15 ++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 6139cccee36f..567e72081ce0 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -571,7 +571,9 @@ impl FileSource for ParquetSource { let Some(file_schema) = self.file_schema.clone() else { return Ok(filter_pushdown_not_supported(fd)); }; - if config.execution.parquet.pushdown_filters { + let config_pushdown_enabled = config.execution.parquet.pushdown_filters; + let table_pushdown_enabled = self.pushdown_filters(); + if table_pushdown_enabled || config_pushdown_enabled { let mut conf = self.clone(); let mut allowed_filters = vec![]; let mut remaining_filters = vec![]; diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index ed74dc94be3b..511c3c7a84df 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -88,9 +88,8 @@ physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 -04)------FilterExec: b@1 > 2, projection=[a@0] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2 # When filter pushdown *is* enabled, ParquetExec can filter exactly, @@ -139,9 +138,8 @@ physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 -04)------FilterExec: b@1 > 2 AND a@0 IS NOT NULL, projection=[a@0] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL AND b@1 > 2 AND a@0 IS NOT NULL query I @@ -161,9 +159,8 @@ physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 -04)------FilterExec: a@0 = bar, projection=[b@1] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=a@0 = bar +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar AND a@0 = bar ## cleanup statement ok From 32fed3423c32b4149e10d779d3e6e3a1a1320d62 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 20:47:37 -0500 Subject: [PATCH 09/15] add order by --- datafusion/datasource-parquet/src/source.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 567e72081ce0..51b8edf161ba 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -580,10 +580,10 @@ impl FileSource for ParquetSource { for filter in &fd.filters { if can_expr_be_pushed_down_with_schemas(filter, &file_schema) { // This filter can be pushed down - allowed_filters.push(Arc::clone(&filter)); + allowed_filters.push(Arc::clone(filter)); } else { // This filter cannot be pushed down - remaining_filters.push(Arc::clone(&filter)); + remaining_filters.push(Arc::clone(filter)); } } let predicate = match conf.predicate { From eb1fce30b91f8714887ef5b386979aa67ef64868 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 20:51:36 -0500 Subject: [PATCH 10/15] make function private --- datafusion/datasource-parquet/src/row_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index dc1a08b3d165..03ea3726ea4e 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -385,7 +385,7 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo /// Recurses through expr as a tree, finds all `column`s, and checks if any of them would prevent /// this expression from being predicate pushed down. If any of them would, this returns false. /// Otherwise, true. -pub fn can_expr_be_pushed_down_with_schemas( +pub(crate) fn can_expr_be_pushed_down_with_schemas( expr: &Arc, table_schema: &Schema, ) -> bool { From 1894c1d4bff0c90d5f9d10d26dc5172398c6f85d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 20:53:07 -0500 Subject: [PATCH 11/15] Add note about schema --- datafusion/datasource-parquet/src/row_filter.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 03ea3726ea4e..679de6796dd2 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -370,8 +370,8 @@ fn pushdown_columns( /// to check preemptively if a column name would prevent pushdowning. /// effectively does the inverse of [`pushdown_columns`] does, but with a single given column /// (instead of traversing the entire tree to determine this) -fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bool { - let mut checker = PushdownChecker::new(table_schema); +fn would_column_prevent_pushdown(column_name: &str, file_schema: &Schema) -> bool { + let mut checker = PushdownChecker::new(file_schema); // the return of this is only used for [`PushdownChecker::f_down()`], so we can safely ignore // it here. I'm just verifying we know the return type of this so nobody accidentally changes @@ -385,14 +385,16 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo /// Recurses through expr as a tree, finds all `column`s, and checks if any of them would prevent /// this expression from being predicate pushed down. If any of them would, this returns false. /// Otherwise, true. +/// Note that the schema passed in here is *not* the physical file schema (as it is not available at that point in time); +/// it is the schema of the table that this expression is being evaluated against minus any projected columns and partition columns. pub(crate) fn can_expr_be_pushed_down_with_schemas( expr: &Arc, - table_schema: &Schema, + file_schema: &Schema, ) -> bool { let mut can_be_pushed = true; expr.apply(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema); + can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema); Ok(if can_be_pushed { TreeNodeRecursion::Jump } else { From 3f22a763cc4f08d0c1e35f1b3096afca6cfd43f7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 20:56:02 -0500 Subject: [PATCH 12/15] remove more code --- .../datasource-parquet/src/row_filter.rs | 32 +------------------ 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 679de6796dd2..8c57139def9b 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -366,22 +366,6 @@ fn pushdown_columns( .then_some(checker.required_columns.into_iter().collect())) } -/// creates a PushdownChecker for a single use to check a given column with the given schemes. Used -/// to check preemptively if a column name would prevent pushdowning. -/// effectively does the inverse of [`pushdown_columns`] does, but with a single given column -/// (instead of traversing the entire tree to determine this) -fn would_column_prevent_pushdown(column_name: &str, file_schema: &Schema) -> bool { - let mut checker = PushdownChecker::new(file_schema); - - // the return of this is only used for [`PushdownChecker::f_down()`], so we can safely ignore - // it here. I'm just verifying we know the return type of this so nobody accidentally changes - // the return type of this fn and it gets implicitly ignored here. - let _: Option = checker.check_single_column(column_name); - - // and then return a value based on the state of the checker - checker.prevents_pushdown() -} - /// Recurses through expr as a tree, finds all `column`s, and checks if any of them would prevent /// this expression from being predicate pushed down. If any of them would, this returns false. /// Otherwise, true. @@ -391,21 +375,7 @@ pub(crate) fn can_expr_be_pushed_down_with_schemas( expr: &Arc, file_schema: &Schema, ) -> bool { - let mut can_be_pushed = true; - expr.apply(|expr| { - if let Some(column) = expr.as_any().downcast_ref::() { - can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema); - Ok(if can_be_pushed { - TreeNodeRecursion::Jump - } else { - TreeNodeRecursion::Stop - }) - } else { - Ok(TreeNodeRecursion::Continue) - } - }) - .unwrap(); // we never return an Err, so we can safely unwrap this - can_be_pushed + pushdown_columns(expr, file_schema).is_ok() } /// Calculate the total compressed size of all `Column`'s required for From 3fde44538fcf7fcb8ab40a672bf61020c6174d67 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 19 Apr 2025 21:31:52 -0500 Subject: [PATCH 13/15] fix test --- datafusion/datasource-parquet/src/row_filter.rs | 5 ++++- datafusion/datasource-parquet/src/source.rs | 5 +---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 8c57139def9b..e537d7e02b63 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -375,7 +375,10 @@ pub(crate) fn can_expr_be_pushed_down_with_schemas( expr: &Arc, file_schema: &Schema, ) -> bool { - pushdown_columns(expr, file_schema).is_ok() + match pushdown_columns(expr, file_schema) { + Ok(Some(_)) => true, + Ok(None) | Err(_) => false, + } } /// Calculate the total compressed size of all `Column`'s required for diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 51b8edf161ba..94fef1b1dd5f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -91,10 +91,7 @@ use object_store::ObjectStore; /// # let file_schema = Arc::new(Schema::empty()); /// # let object_store_url = ObjectStoreUrl::local_filesystem(); /// # let predicate = lit(true); -/// let source = Arc::new( -/// ParquetSource::default() -/// .with_predicate(Arc::clone(&file_schema), predicate) -/// ); +/// let source = Arc::new(ParquetSource::default().with_predicate(predicate)); /// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB /// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source) /// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build(); From f2bec87e2056faf190465b072727bcadf9024a7d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 16:52:46 -0700 Subject: [PATCH 14/15] avoid clone --- datafusion/datasource-parquet/src/source.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 94fef1b1dd5f..7c04d9f8fb2e 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -562,7 +562,7 @@ impl FileSource for ParquetSource { fn try_pushdown_filters( &self, - fd: FilterDescription, + mut fd: FilterDescription, config: &datafusion_common::config::ConfigOptions, ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { @@ -574,13 +574,13 @@ impl FileSource for ParquetSource { let mut conf = self.clone(); let mut allowed_filters = vec![]; let mut remaining_filters = vec![]; - for filter in &fd.filters { - if can_expr_be_pushed_down_with_schemas(filter, &file_schema) { + for filter in fd.take_description() { + if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) { // This filter can be pushed down - allowed_filters.push(Arc::clone(filter)); + allowed_filters.push(filter); } else { // This filter cannot be pushed down - remaining_filters.push(Arc::clone(filter)); + remaining_filters.push(filter); } } let predicate = match conf.predicate { From 8c1b98cf8aafad6d47142c0dd193ef001024bd28 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 16:53:25 -0700 Subject: [PATCH 15/15] rename var --- datafusion/datasource-parquet/src/source.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 7c04d9f8fb2e..8d9eba624ad2 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -571,7 +571,7 @@ impl FileSource for ParquetSource { let config_pushdown_enabled = config.execution.parquet.pushdown_filters; let table_pushdown_enabled = self.pushdown_filters(); if table_pushdown_enabled || config_pushdown_enabled { - let mut conf = self.clone(); + let mut source = self.clone(); let mut allowed_filters = vec![]; let mut remaining_filters = vec![]; for filter in fd.take_description() { @@ -583,17 +583,17 @@ impl FileSource for ParquetSource { remaining_filters.push(filter); } } - let predicate = match conf.predicate { + let predicate = match source.predicate { Some(predicate) => { conjunction(std::iter::once(predicate).chain(allowed_filters)) } None => conjunction(allowed_filters), }; - conf.predicate = Some(predicate); + source.predicate = Some(predicate); Ok(FilterPushdownResult { support: FilterPushdownSupport::Supported { child_descriptions: vec![], - op: Arc::new(conf), + op: Arc::new(source), revisit: false, }, remaining_description: FilterDescription {