From 23fa3f69b20ce2a4b65f005a887a93eae835f190 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 25 Mar 2026 21:14:33 +0530 Subject: [PATCH] fix(datasource): keep stats absent when collect_stats is false --- datafusion/datasource/src/statistics.rs | 177 ++++++++++++++++++++++-- 1 file changed, 166 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index e5a1e4613b3d4..7872e2f9f6802 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -320,16 +320,19 @@ pub async fn get_statistics_with_limit( file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); - // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in - file_stats.column_statistics.clone().into_iter().enumerate() - { - col_stats_set[index].null_count = file_column.null_count; - col_stats_set[index].max_value = file_column.max_value; - col_stats_set[index].min_value = file_column.min_value; - col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); + if collect_stats { + // First file, we set them directly from the file statistics. + num_rows = file_stats.num_rows; + total_byte_size = file_stats.total_byte_size; + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; + col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); + col_stats_set[index].byte_size = file_column.byte_size; + } } // If the number of rows exceeds the limit, we can stop processing @@ -520,6 +523,39 @@ mod tests { } } + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])) + } + + fn make_file_stats( + num_rows: usize, + total_byte_size: usize, + col_stats: ColumnStatistics, + ) -> Arc { + Arc::new(Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Exact(total_byte_size), + column_statistics: vec![col_stats], + }) + } + + fn rich_col_stats( + null_count: usize, + min: i64, + max: i64, + sum: i64, + byte_size: usize, + ) -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Exact(null_count), + max_value: Precision::Exact(ScalarValue::Int64(Some(max))), + min_value: Precision::Exact(ScalarValue::Int64(Some(min))), + distinct_count: Precision::Absent, + sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))), + byte_size: Precision::Exact(byte_size), + } + } + #[tokio::test] #[expect(deprecated)] async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type() @@ -533,7 +569,7 @@ mod tests { ))]); let (_group, stats) = - get_statistics_with_limit(files, schema, None, false).await?; + get_statistics_with_limit(files, schema, None, true).await?; assert_eq!( stats.column_statistics[0].sum_value, @@ -571,4 +607,123 @@ mod tests { Ok(()) } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), + )), + ]); + + let (_files, statistics) = + get_statistics_with_limit(all_files, test_schema(), None, false) + .await + .unwrap(); + + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!(statistics.total_byte_size, Precision::Absent); + assert_eq!(statistics.column_statistics.len(), 1); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Absent + ); + assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), + )), + ]); + + let (_files, statistics) = + get_statistics_with_limit(all_files, test_schema(), None, true) + .await + .unwrap(); + + assert_eq!(statistics.num_rows, Precision::Exact(15)); + assert_eq!(statistics.total_byte_size, Precision::Exact(150)); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(3) + ); + assert_eq!( + statistics.column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + statistics.column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int64(Some(99))) + ); + assert_eq!( + statistics.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::Int64(Some(315))) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Exact(192) + ); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)), + )), + Ok(( + PartitionedFile::new("third.parquet", 20), + make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)), + )), + ]); + + let (files, statistics) = + get_statistics_with_limit(all_files, test_schema(), Some(8), true) + .await + .unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(statistics.num_rows, Precision::Inexact(10)); + assert_eq!(statistics.total_byte_size, Precision::Inexact(100)); + assert_eq!( + statistics.column_statistics[0].min_value, + Precision::Inexact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + statistics.column_statistics[0].max_value, + Precision::Inexact(ScalarValue::Int64(Some(10))) + ); + assert_eq!( + statistics.column_statistics[0].sum_value, + Precision::Inexact(ScalarValue::Int64(Some(55))) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(128) + ); + } }