From 537035b77ac039fbd6edafcd9664c02894d29194 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 23 Mar 2026 11:53:32 +0100 Subject: [PATCH 1/6] feat: add support for parquet content defined chunking options --- Cargo.toml | 18 +++ datafusion/common/src/config.rs | 23 ++++ .../common/src/file_options/parquet_writer.rs | 77 ++++++++++++- .../datasource-parquet/src/file_format.rs | 6 +- datafusion/proto-common/Cargo.toml | 2 + .../proto/datafusion_common.proto | 14 +++ datafusion/proto-common/src/from_proto/mod.rs | 104 +++++++++++++++++- .../proto-common/src/generated/pbjson.rs | 88 +++++++++++++++ .../proto-common/src/generated/prost.rs | 30 +++++ datafusion/proto-common/src/to_proto/mod.rs | 15 ++- .../src/generated/datafusion_proto_common.rs | 28 +++++ .../proto/src/logical_plan/file_formats.rs | 6 +- .../test_files/information_schema.slt | 8 ++ 13 files changed, 411 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7844f8d41a7d6..a8aafb36995ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -282,3 +282,21 @@ incremental = false inherits = "release" debug = true strip = false + +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-arith = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-array = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-buffer = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-cast = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-csv = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-data = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-flight = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-ipc = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-json = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-ord = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-row = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-schema = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-select = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +arrow-string = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } +parquet = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d5fbfc50a21..0815686d9b0c4 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -845,6 +845,7 @@ config_namespace! { /// default parquet writer setting pub bloom_filter_ndv: Option, default = None + /// (writing) Controls whether DataFusion will attempt to speed up writing /// parquet files by serializing them in parallel. Each column /// in each row group in each output file are serialized in parallel @@ -872,6 +873,27 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing + /// parquet files. When true, the other `cdc_*` options control the chunking + /// behavior. When CDC is enabled, parallel writing is automatically disabled + /// since the chunker state must persist across row groups. + pub enable_content_defined_chunking: bool, default = false + + /// (writing) Minimum chunk size in bytes for content-defined chunking. + /// The rolling hash will not be updated until this size is reached for each chunk. + /// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. + pub cdc_min_chunk_size: usize, default = 256 * 1024 + + /// (writing) Maximum chunk size in bytes for content-defined chunking. + /// The chunker will create a new chunk whenever the chunk size exceeds this value. + /// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. + pub cdc_max_chunk_size: usize, default = 1024 * 1024 + + /// (writing) Normalization level for content-defined chunking. + /// Increasing this improves deduplication ratio but increases fragmentation. + /// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. + pub cdc_norm_level: i64, default = 0 } } @@ -1820,6 +1842,7 @@ config_field!(usize); config_field!(f64); config_field!(u64); config_field!(u32); +config_field!(i64); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a7a1fc6d0bb66..d9d16892917d1 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, - crypto: _, + .. } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; @@ -191,6 +191,10 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, + enable_content_defined_chunking, + cdc_min_chunk_size, + cdc_max_chunk_size, + cdc_norm_level, // not in WriterProperties enable_page_index: _, @@ -247,6 +251,15 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } + if *enable_content_defined_chunking { + builder = builder.set_content_defined_chunking(Some( + parquet::file::properties::CdcOptions { + min_chunk_size: *cdc_min_chunk_size, + max_chunk_size: *cdc_max_chunk_size, + norm_level: *cdc_norm_level as i32, + }, + )); + } Ok(builder) } @@ -460,6 +473,10 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + enable_content_defined_chunking: defaults.enable_content_defined_chunking, + cdc_min_chunk_size: defaults.cdc_min_chunk_size, + cdc_max_chunk_size: defaults.cdc_max_chunk_size, + cdc_norm_level: defaults.cdc_norm_level, } } @@ -576,6 +593,21 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + enable_content_defined_chunking: props + .content_defined_chunking() + .is_some(), + cdc_min_chunk_size: props + .content_defined_chunking() + .map(|c| c.min_chunk_size) + .unwrap_or(global_options_defaults.cdc_min_chunk_size), + cdc_max_chunk_size: props + .content_defined_chunking() + .map(|c| c.max_chunk_size) + .unwrap_or(global_options_defaults.cdc_max_chunk_size), + cdc_norm_level: props + .content_defined_chunking() + .map(|c| c.norm_level as i64) + .unwrap_or(global_options_defaults.cdc_norm_level), }, column_specific_options, key_value_metadata, @@ -786,6 +818,49 @@ mod tests { ); } + #[test] + fn test_cdc_enabled_with_custom_options() { + let mut opts = TableParquetOptions::default(); + opts.global.enable_content_defined_chunking = true; + opts.global.cdc_min_chunk_size = 128 * 1024; + opts.global.cdc_max_chunk_size = 512 * 1024; + opts.global.cdc_norm_level = 2; + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let cdc = props.content_defined_chunking().expect("CDC should be set"); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_cdc_disabled_by_default() { + let mut opts = TableParquetOptions::default(); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert!(props.content_defined_chunking().is_none()); + } + + #[test] + fn test_cdc_round_trip_through_writer_props() { + let mut opts = TableParquetOptions::default(); + opts.global.enable_content_defined_chunking = true; + opts.global.cdc_min_chunk_size = 64 * 1024; + opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; + opts.global.cdc_norm_level = -1; + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let recovered = session_config_from_writer_props(&props); + + assert_eq!(recovered.global.enable_content_defined_chunking, true); + assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); + assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); + assert_eq!(recovered.global.cdc_norm_level, -1); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c391b299d4a29..42dc01eb9f017 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1368,7 +1368,11 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path).await?; - if !parquet_opts.global.allow_single_file_parallelism { + // CDC requires the sequential writer: the chunker state lives in ArrowWriter + // and persists across row groups. The parallel path bypasses ArrowWriter entirely. + if !parquet_opts.global.allow_single_file_parallelism + || parquet_opts.global.enable_content_defined_chunking + { let mut writer = self .create_async_arrow_writer( &path, diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 46dae36ba40ed..9de3494ec1e21 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -37,10 +37,12 @@ name = "datafusion_proto_common" [features] default = [] json = ["serde", "pbjson"] +parquet = ["datafusion-common/parquet", "dep:parquet"] [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true } +parquet = { workspace = true, optional = true } pbjson = { workspace = true, optional = true } prost = { workspace = true } serde = { version = "1.0", optional = true } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..388bac272536f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -603,6 +603,20 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + bool content_defined_chunking = 35; // default = false + + oneof cdc_min_chunk_size_opt { + uint64 cdc_min_chunk_size = 36; + } + + oneof cdc_max_chunk_size_opt { + uint64 cdc_max_chunk_size = 37; + } + + oneof cdc_norm_level_opt { + int32 cdc_norm_level = 38; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..7459dfc831603 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1090,6 +1090,16 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + enable_content_defined_chunking: value.content_defined_chunking, + cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt { + protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize, + }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), + cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt { + protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize, + }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), + cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt { + protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64, + }).unwrap_or(ParquetOptions::default().cdc_norm_level), }) } } @@ -1152,7 +1162,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { column_specific_options.insert(column_name.clone(), options.try_into()?); } } - Ok(TableParquetOptions { + let opts = TableParquetOptions { global: value .global .as_ref() @@ -1160,9 +1170,9 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, - key_value_metadata: Default::default(), - crypto: Default::default(), - }) + ..Default::default() + }; + Ok(opts) } } @@ -1262,3 +1272,89 @@ pub(crate) fn csv_writer_options_from_proto( .with_null(writer_options.null_value.clone()) .with_double_quote(writer_options.double_quote)) } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::config::{ParquetOptions, TableParquetOptions}; + + fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { + let proto: crate::protobuf_common::ParquetOptions = + (&opts).try_into().expect("to_proto"); + ParquetOptions::try_from(&proto).expect("from_proto") + } + + fn table_parquet_options_proto_round_trip( + opts: TableParquetOptions, + ) -> TableParquetOptions { + let proto: crate::protobuf_common::TableParquetOptions = + (&opts).try_into().expect("to_proto"); + TableParquetOptions::try_from(&proto).expect("from_proto") + } + + #[test] + fn test_parquet_options_cdc_disabled_round_trip() { + let opts = ParquetOptions::default(); + assert!(!opts.enable_content_defined_chunking); + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(opts, recovered); + } + + #[test] + fn test_parquet_options_cdc_enabled_round_trip() { + let opts = ParquetOptions { + enable_content_defined_chunking: true, + cdc_min_chunk_size: 128 * 1024, + cdc_max_chunk_size: 512 * 1024, + cdc_norm_level: 2, + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.enable_content_defined_chunking, true); + assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024); + assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024); + assert_eq!(recovered.cdc_norm_level, 2); + } + + #[test] + fn test_parquet_options_cdc_negative_norm_level_round_trip() { + let opts = ParquetOptions { + enable_content_defined_chunking: true, + cdc_norm_level: -3, + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts); + assert_eq!(recovered.cdc_norm_level, -3); + } + + #[test] + fn test_table_parquet_options_cdc_round_trip() { + let mut opts = TableParquetOptions::default(); + opts.global.enable_content_defined_chunking = true; + opts.global.cdc_min_chunk_size = 64 * 1024; + opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; + opts.global.cdc_norm_level = -1; + + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.global.enable_content_defined_chunking, true); + assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); + assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); + assert_eq!(recovered.global.cdc_norm_level, -1); + } + + #[test] + fn test_table_parquet_options_cdc_disabled_round_trip() { + let opts = TableParquetOptions::default(); + assert!(!opts.global.enable_content_defined_chunking); + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.global.enable_content_defined_chunking, false); + assert_eq!( + recovered.global.cdc_min_chunk_size, + ParquetOptions::default().cdc_min_chunk_size + ); + assert_eq!( + recovered.global.cdc_max_chunk_size, + ParquetOptions::default().cdc_max_chunk_size + ); + } +} diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..c639baf94cb5c 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5728,6 +5728,18 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.content_defined_chunking { + len += 1; + } + if self.cdc_min_chunk_size_opt.is_some() { + len += 1; + } + if self.cdc_max_chunk_size_opt.is_some() { + len += 1; + } + if self.cdc_norm_level_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5893,6 +5905,34 @@ impl serde::Serialize for ParquetOptions { } } } + if self.content_defined_chunking { + struct_ser.serialize_field("contentDefinedChunking", &self.content_defined_chunking)?; + } + if let Some(v) = self.cdc_min_chunk_size_opt.as_ref() { + match v { + parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("cdcMinChunkSize", ToString::to_string(&v).as_str())?; + } + } + } + if let Some(v) = self.cdc_max_chunk_size_opt.as_ref() { + match v { + parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("cdcMaxChunkSize", ToString::to_string(&v).as_str())?; + } + } + } + if let Some(v) = self.cdc_norm_level_opt.as_ref() { + match v { + parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => { + struct_ser.serialize_field("cdcNormLevel", v)?; + } + } + } struct_ser.end() } } @@ -5964,6 +6004,14 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "content_defined_chunking", + "contentDefinedChunking", + "cdc_min_chunk_size", + "cdcMinChunkSize", + "cdc_max_chunk_size", + "cdcMaxChunkSize", + "cdc_norm_level", + "cdcNormLevel", ]; #[allow(clippy::enum_variant_names)] @@ -6000,6 +6048,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + ContentDefinedChunking, + CdcMinChunkSize, + CdcMaxChunkSize, + CdcNormLevel, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6053,6 +6105,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), + "cdcMinChunkSize" | "cdc_min_chunk_size" => Ok(GeneratedField::CdcMinChunkSize), + "cdcMaxChunkSize" | "cdc_max_chunk_size" => Ok(GeneratedField::CdcMaxChunkSize), + "cdcNormLevel" | "cdc_norm_level" => Ok(GeneratedField::CdcNormLevel), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6104,6 +6160,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut content_defined_chunking__ = None; + let mut cdc_min_chunk_size_opt__ = None; + let mut cdc_max_chunk_size_opt__ = None; + let mut cdc_norm_level_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6312,6 +6372,30 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = Some(map_.next_value()?); + } + GeneratedField::CdcMinChunkSize => { + if cdc_min_chunk_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcMinChunkSize")); + } + cdc_min_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(x.0)); + } + GeneratedField::CdcMaxChunkSize => { + if cdc_max_chunk_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcMaxChunkSize")); + } + cdc_max_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(x.0)); + } + GeneratedField::CdcNormLevel => { + if cdc_norm_level_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcNormLevel")); + } + cdc_norm_level_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcNormLevelOpt::CdcNormLevel(x.0)); + } } } Ok(ParquetOptions { @@ -6347,6 +6431,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + content_defined_chunking: content_defined_chunking__.unwrap_or_default(), + cdc_min_chunk_size_opt: cdc_min_chunk_size_opt__, + cdc_max_chunk_size_opt: cdc_max_chunk_size_opt__, + cdc_norm_level_opt: cdc_norm_level_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..1ba7fe702665a 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -872,6 +872,21 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + /// default = false + #[prost(bool, tag = "35")] + pub content_defined_chunking: bool, + #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] + pub cdc_min_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMinChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] + pub cdc_max_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMaxChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] + pub cdc_norm_level_opt: ::core::option::Option< + parquet_options::CdcNormLevelOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -930,6 +945,21 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMinChunkSizeOpt { + #[prost(uint64, tag = "36")] + CdcMinChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMaxChunkSizeOpt { + #[prost(uint64, tag = "37")] + CdcMaxChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcNormLevelOpt { + #[prost(int32, tag = "38")] + CdcNormLevel(i32), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..87c39e205ce65 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,16 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + content_defined_chunking: value.enable_content_defined_chunking, + cdc_min_chunk_size_opt: value.enable_content_defined_chunking.then(|| + protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(value.cdc_min_chunk_size as u64) + ), + cdc_max_chunk_size_opt: value.enable_content_defined_chunking.then(|| + protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(value.cdc_max_chunk_size as u64) + ), + cdc_norm_level_opt: value.enable_content_defined_chunking.then(|| + protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(value.cdc_norm_level as i32) + ), }) } } @@ -963,8 +973,11 @@ impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions { .iter() .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) .collect::>(); + + let global: protobuf::ParquetOptions = (&value.global).try_into()?; + Ok(protobuf::TableParquetOptions { - global: Some((&value.global).try_into()?), + global: Some(global), column_specific_options, key_value_metadata, }) diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..8c9af5ec75584 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,6 +838,9 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + /// default = false + #[prost(bool, tag = "35")] + pub content_defined_chunking: bool, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -872,6 +875,16 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] + pub cdc_min_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMinChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] + pub cdc_max_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMaxChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] + pub cdc_norm_level_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -930,6 +943,21 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMinChunkSizeOpt { + #[prost(uint64, tag = "36")] + CdcMinChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMaxChunkSizeOpt { + #[prost(uint64, tag = "37")] + CdcMaxChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcNormLevelOpt { + #[prost(int32, tag = "38")] + CdcNormLevel(i32), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..c6b638f875732 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -525,6 +525,10 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + enable_content_defined_chunking: Default::default(), + cdc_min_chunk_size: Default::default(), + cdc_max_chunk_size: Default::default(), + cdc_norm_level: Default::default(), } } } @@ -585,7 +589,7 @@ mod parquet { .iter() .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), - crypto: Default::default(), + ..Default::default() } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2da823421dd76..e36f049da3ba0 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -235,6 +235,9 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.cdc_max_chunk_size 1048576 +datafusion.execution.parquet.cdc_min_chunk_size 262144 +datafusion.execution.parquet.cdc_norm_level 0 datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -243,6 +246,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 +datafusion.execution.parquet.enable_content_defined_chunking false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -376,6 +380,9 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.cdc_max_chunk_size 1048576 (writing) Maximum chunk size in bytes for content-defined chunking. The chunker will create a new chunk whenever the chunk size exceeds this value. Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. +datafusion.execution.parquet.cdc_min_chunk_size 262144 (writing) Minimum chunk size in bytes for content-defined chunking. The rolling hash will not be updated until this size is reached for each chunk. Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. +datafusion.execution.parquet.cdc_norm_level 0 (writing) Normalization level for content-defined chunking. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. @@ -384,6 +391,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes +datafusion.execution.parquet.enable_content_defined_chunking false (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When true, the other `cdc_*` options control the chunking behavior. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. From 00beb61efc3a6edaeb817a1d52739dbea8f90185 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 23 Mar 2026 12:36:52 +0100 Subject: [PATCH 2/6] fix: add CDC fields to datafusion-proto file_formats serialization --- .../proto/src/logical_plan/file_formats.rs | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index c6b638f875732..4dd6b7b4dd051 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -426,6 +426,16 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + content_defined_chunking: global_options.global.enable_content_defined_chunking, + cdc_min_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { + parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(global_options.global.cdc_min_chunk_size as u64) + }), + cdc_max_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { + parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(global_options.global.cdc_max_chunk_size as u64) + }), + cdc_norm_level_opt: global_options.global.enable_content_defined_chunking.then(|| { + parquet_options::CdcNormLevelOpt::CdcNormLevel(global_options.global.cdc_norm_level as i32) + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,10 +535,16 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), - enable_content_defined_chunking: Default::default(), - cdc_min_chunk_size: Default::default(), - cdc_max_chunk_size: Default::default(), - cdc_norm_level: Default::default(), + enable_content_defined_chunking: proto.content_defined_chunking, + cdc_min_chunk_size: proto.cdc_min_chunk_size_opt.as_ref().map(|opt| match opt { + parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => *v as usize, + }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), + cdc_max_chunk_size: proto.cdc_max_chunk_size_opt.as_ref().map(|opt| match opt { + parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => *v as usize, + }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), + cdc_norm_level: proto.cdc_norm_level_opt.as_ref().map(|opt| match opt { + parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => *v as i64, + }).unwrap_or(ParquetOptions::default().cdc_norm_level), } } } From 62f1ae8eb0c53c7b34f55d93663e7459b4548b78 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Wed, 25 Mar 2026 14:57:06 +0100 Subject: [PATCH 3/6] chore: use the released arrow-rs 58.1 dependency --- Cargo.toml | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8aafb36995ef..7844f8d41a7d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -282,21 +282,3 @@ incremental = false inherits = "release" debug = true strip = false - -[patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-arith = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-array = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-buffer = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-cast = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-csv = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-data = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-flight = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-ipc = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-json = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-ord = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-row = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-schema = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-select = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -arrow-string = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } -parquet = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" } From 248e9cd8a755a0eda3738531715004ca999ae403 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Wed, 25 Mar 2026 17:26:27 +0100 Subject: [PATCH 4/6] chore: update parquet CDC options and proto serialization for arrow-rs 58.1 --- Cargo.lock | 1 + datafusion/common/src/config.rs | 41 +-- .../common/src/file_options/parquet_writer.rs | 70 +++-- .../datasource-parquet/src/file_format.rs | 2 +- .../proto/datafusion_common.proto | 18 +- datafusion/proto-common/src/from_proto/mod.rs | 81 +++--- .../proto-common/src/generated/pbjson.rs | 244 +++++++++++------- .../proto-common/src/generated/prost.rs | 41 +-- datafusion/proto-common/src/to_proto/mod.rs | 15 +- .../src/generated/datafusion_proto_common.rs | 39 +-- .../proto/src/logical_plan/file_formats.rs | 39 ++- .../test_files/information_schema.slt | 10 +- 12 files changed, 302 insertions(+), 299 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a311bf0941fc..54099fbe172fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2532,6 +2532,7 @@ dependencies = [ "arrow", "datafusion-common", "doc-comment", + "parquet", "pbjson 0.9.0", "prost", "serde", diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0815686d9b0c4..7dd433cdc7001 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -687,6 +687,24 @@ config_namespace! { } } +config_namespace! { + /// Options for content-defined chunking (CDC) when writing parquet files. + /// See [`ParquetOptions::use_content_defined_chunking`]. + pub struct CdcOptions { + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, default = 256 * 1024 + + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, default = 1024 * 1024 + + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i64, default = 0 + } +} + config_namespace! { /// Options for reading and writing parquet files /// @@ -875,25 +893,10 @@ config_namespace! { pub maximum_buffered_record_batches_per_stream: usize, default = 2 /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing - /// parquet files. When true, the other `cdc_*` options control the chunking - /// behavior. When CDC is enabled, parallel writing is automatically disabled - /// since the chunker state must persist across row groups. - pub enable_content_defined_chunking: bool, default = false - - /// (writing) Minimum chunk size in bytes for content-defined chunking. - /// The rolling hash will not be updated until this size is reached for each chunk. - /// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. - pub cdc_min_chunk_size: usize, default = 256 * 1024 - - /// (writing) Maximum chunk size in bytes for content-defined chunking. - /// The chunker will create a new chunk whenever the chunk size exceeds this value. - /// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. - pub cdc_max_chunk_size: usize, default = 1024 * 1024 - - /// (writing) Normalization level for content-defined chunking. - /// Increasing this improves deduplication ratio but increases fragmentation. - /// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. - pub cdc_norm_level: i64, default = 0 + /// parquet files. When `Some`, CDC is enabled with the given options; when `None` + /// (the default), CDC is disabled. When CDC is enabled, parallel writing is + /// automatically disabled since the chunker state must persist across row groups. + pub use_content_defined_chunking: Option, default = None } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index d9d16892917d1..103719b21315d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -191,10 +191,7 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, - enable_content_defined_chunking, - cdc_min_chunk_size, - cdc_max_chunk_size, - cdc_norm_level, + use_content_defined_chunking, // not in WriterProperties enable_page_index: _, @@ -251,12 +248,12 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } - if *enable_content_defined_chunking { + if let Some(cdc) = use_content_defined_chunking { builder = builder.set_content_defined_chunking(Some( parquet::file::properties::CdcOptions { - min_chunk_size: *cdc_min_chunk_size, - max_chunk_size: *cdc_max_chunk_size, - norm_level: *cdc_norm_level as i32, + min_chunk_size: cdc.min_chunk_size, + max_chunk_size: cdc.max_chunk_size, + norm_level: cdc.norm_level as i32, }, )); } @@ -401,7 +398,9 @@ mod tests { use super::*; #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; - use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::config::{ + CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions, + }; use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ @@ -473,10 +472,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, - enable_content_defined_chunking: defaults.enable_content_defined_chunking, - cdc_min_chunk_size: defaults.cdc_min_chunk_size, - cdc_max_chunk_size: defaults.cdc_max_chunk_size, - cdc_norm_level: defaults.cdc_norm_level, + use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), } } @@ -593,21 +589,13 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, - enable_content_defined_chunking: props - .content_defined_chunking() - .is_some(), - cdc_min_chunk_size: props - .content_defined_chunking() - .map(|c| c.min_chunk_size) - .unwrap_or(global_options_defaults.cdc_min_chunk_size), - cdc_max_chunk_size: props - .content_defined_chunking() - .map(|c| c.max_chunk_size) - .unwrap_or(global_options_defaults.cdc_max_chunk_size), - cdc_norm_level: props - .content_defined_chunking() - .map(|c| c.norm_level as i64) - .unwrap_or(global_options_defaults.cdc_norm_level), + use_content_defined_chunking: props.content_defined_chunking().map(|c| { + CdcOptions { + min_chunk_size: c.min_chunk_size, + max_chunk_size: c.max_chunk_size, + norm_level: c.norm_level as i64, + } + }), }, column_specific_options, key_value_metadata, @@ -821,10 +809,11 @@ mod tests { #[test] fn test_cdc_enabled_with_custom_options() { let mut opts = TableParquetOptions::default(); - opts.global.enable_content_defined_chunking = true; - opts.global.cdc_min_chunk_size = 128 * 1024; - opts.global.cdc_max_chunk_size = 512 * 1024; - opts.global.cdc_norm_level = 2; + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }); opts.arrow_schema(&Arc::new(Schema::empty())); let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); @@ -846,19 +835,20 @@ mod tests { #[test] fn test_cdc_round_trip_through_writer_props() { let mut opts = TableParquetOptions::default(); - opts.global.enable_content_defined_chunking = true; - opts.global.cdc_min_chunk_size = 64 * 1024; - opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; - opts.global.cdc_norm_level = -1; + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); opts.arrow_schema(&Arc::new(Schema::empty())); let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); let recovered = session_config_from_writer_props(&props); - assert_eq!(recovered.global.enable_content_defined_chunking, true); - assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); - assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); - assert_eq!(recovered.global.cdc_norm_level, -1); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); } #[test] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 42dc01eb9f017..83b719d576b02 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1371,7 +1371,7 @@ impl FileSink for ParquetSink { // CDC requires the sequential writer: the chunker state lives in ArrowWriter // and persists across row groups. The parallel path bypasses ArrowWriter entirely. if !parquet_opts.global.allow_single_file_parallelism - || parquet_opts.global.enable_content_defined_chunking + || parquet_opts.global.use_content_defined_chunking.is_some() { let mut writer = self .create_async_arrow_writer( diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 388bac272536f..31ece63577b4f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -604,19 +604,13 @@ message ParquetOptions { uint64 max_predicate_cache_size = 33; } - bool content_defined_chunking = 35; // default = false - - oneof cdc_min_chunk_size_opt { - uint64 cdc_min_chunk_size = 36; - } - - oneof cdc_max_chunk_size_opt { - uint64 cdc_max_chunk_size = 37; - } + CdcOptions content_defined_chunking = 35; +} - oneof cdc_norm_level_opt { - int32 cdc_norm_level = 38; - } +message CdcOptions { + uint64 min_chunk_size = 1; + uint64 max_chunk_size = 2; + int32 norm_level = 3; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 7459dfc831603..493bd359a62b5 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -40,7 +40,7 @@ use datafusion_common::{ DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, arrow_datafusion_err, config::{ - CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, + CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -1090,16 +1090,14 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), - enable_content_defined_chunking: value.content_defined_chunking, - cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt { - protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize, - }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), - cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt { - protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize, - }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), - cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt { - protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64, - }).unwrap_or(ParquetOptions::default().cdc_norm_level), + use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + norm_level: cdc.norm_level as i64, + } + }), }) } } @@ -1276,7 +1274,7 @@ pub(crate) fn csv_writer_options_from_proto( #[cfg(test)] mod tests { use super::*; - use datafusion_common::config::{ParquetOptions, TableParquetOptions}; + use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { let proto: crate::protobuf_common::ParquetOptions = @@ -1295,7 +1293,7 @@ mod tests { #[test] fn test_parquet_options_cdc_disabled_round_trip() { let opts = ParquetOptions::default(); - assert!(!opts.enable_content_defined_chunking); + assert!(opts.use_content_defined_chunking.is_none()); let recovered = parquet_options_proto_round_trip(opts.clone()); assert_eq!(opts, recovered); } @@ -1303,58 +1301,57 @@ mod tests { #[test] fn test_parquet_options_cdc_enabled_round_trip() { let opts = ParquetOptions { - enable_content_defined_chunking: true, - cdc_min_chunk_size: 128 * 1024, - cdc_max_chunk_size: 512 * 1024, - cdc_norm_level: 2, + use_content_defined_chunking: Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }), ..ParquetOptions::default() }; let recovered = parquet_options_proto_round_trip(opts.clone()); - assert_eq!(recovered.enable_content_defined_chunking, true); - assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024); - assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024); - assert_eq!(recovered.cdc_norm_level, 2); + let cdc = recovered.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); } #[test] fn test_parquet_options_cdc_negative_norm_level_round_trip() { let opts = ParquetOptions { - enable_content_defined_chunking: true, - cdc_norm_level: -3, + use_content_defined_chunking: Some(CdcOptions { + norm_level: -3, + ..CdcOptions::default() + }), ..ParquetOptions::default() }; let recovered = parquet_options_proto_round_trip(opts); - assert_eq!(recovered.cdc_norm_level, -3); + assert_eq!( + recovered.use_content_defined_chunking.unwrap().norm_level, + -3 + ); } #[test] fn test_table_parquet_options_cdc_round_trip() { let mut opts = TableParquetOptions::default(); - opts.global.enable_content_defined_chunking = true; - opts.global.cdc_min_chunk_size = 64 * 1024; - opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; - opts.global.cdc_norm_level = -1; + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); let recovered = table_parquet_options_proto_round_trip(opts.clone()); - assert_eq!(recovered.global.enable_content_defined_chunking, true); - assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); - assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); - assert_eq!(recovered.global.cdc_norm_level, -1); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); } #[test] fn test_table_parquet_options_cdc_disabled_round_trip() { let opts = TableParquetOptions::default(); - assert!(!opts.global.enable_content_defined_chunking); + assert!(opts.global.use_content_defined_chunking.is_none()); let recovered = table_parquet_options_proto_round_trip(opts.clone()); - assert_eq!(recovered.global.enable_content_defined_chunking, false); - assert_eq!( - recovered.global.cdc_min_chunk_size, - ParquetOptions::default().cdc_min_chunk_size - ); - assert_eq!( - recovered.global.cdc_max_chunk_size, - ParquetOptions::default().cdc_max_chunk_size - ); + assert!(recovered.global.use_content_defined_chunking.is_none()); } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index c639baf94cb5c..77a3b71488ece 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -883,6 +883,144 @@ impl<'de> serde::Deserialize<'de> for AvroOptions { deserializer.deserialize_struct("datafusion_common.AvroOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CdcOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.min_chunk_size != 0 { + len += 1; + } + if self.max_chunk_size != 0 { + len += 1; + } + if self.norm_level != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.CdcOptions", len)?; + if self.min_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("minChunkSize", ToString::to_string(&self.min_chunk_size).as_str())?; + } + if self.max_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxChunkSize", ToString::to_string(&self.max_chunk_size).as_str())?; + } + if self.norm_level != 0 { + struct_ser.serialize_field("normLevel", &self.norm_level)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CdcOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "min_chunk_size", + "minChunkSize", + "max_chunk_size", + "maxChunkSize", + "norm_level", + "normLevel", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + MinChunkSize, + MaxChunkSize, + NormLevel, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "minChunkSize" | "min_chunk_size" => Ok(GeneratedField::MinChunkSize), + "maxChunkSize" | "max_chunk_size" => Ok(GeneratedField::MaxChunkSize), + "normLevel" | "norm_level" => Ok(GeneratedField::NormLevel), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CdcOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.CdcOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut min_chunk_size__ = None; + let mut max_chunk_size__ = None; + let mut norm_level__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::MinChunkSize => { + if min_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("minChunkSize")); + } + min_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::MaxChunkSize => { + if max_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("maxChunkSize")); + } + max_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::NormLevel => { + if norm_level__.is_some() { + return Err(serde::de::Error::duplicate_field("normLevel")); + } + norm_level__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(CdcOptions { + min_chunk_size: min_chunk_size__.unwrap_or_default(), + max_chunk_size: max_chunk_size__.unwrap_or_default(), + norm_level: norm_level__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.CdcOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for Column { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -5695,6 +5833,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { len += 1; } + if self.content_defined_chunking.is_some() { + len += 1; + } if self.metadata_size_hint_opt.is_some() { len += 1; } @@ -5728,18 +5869,6 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } - if self.content_defined_chunking { - len += 1; - } - if self.cdc_min_chunk_size_opt.is_some() { - len += 1; - } - if self.cdc_max_chunk_size_opt.is_some() { - len += 1; - } - if self.cdc_norm_level_opt.is_some() { - len += 1; - } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5818,6 +5947,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { struct_ser.serialize_field("createdBy", &self.created_by)?; } + if let Some(v) = self.content_defined_chunking.as_ref() { + struct_ser.serialize_field("contentDefinedChunking", v)?; + } if let Some(v) = self.metadata_size_hint_opt.as_ref() { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { @@ -5905,34 +6037,6 @@ impl serde::Serialize for ParquetOptions { } } } - if self.content_defined_chunking { - struct_ser.serialize_field("contentDefinedChunking", &self.content_defined_chunking)?; - } - if let Some(v) = self.cdc_min_chunk_size_opt.as_ref() { - match v { - parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => { - #[allow(clippy::needless_borrow)] - #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("cdcMinChunkSize", ToString::to_string(&v).as_str())?; - } - } - } - if let Some(v) = self.cdc_max_chunk_size_opt.as_ref() { - match v { - parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => { - #[allow(clippy::needless_borrow)] - #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("cdcMaxChunkSize", ToString::to_string(&v).as_str())?; - } - } - } - if let Some(v) = self.cdc_norm_level_opt.as_ref() { - match v { - parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => { - struct_ser.serialize_field("cdcNormLevel", v)?; - } - } - } struct_ser.end() } } @@ -5984,6 +6088,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxRowGroupSize", "created_by", "createdBy", + "content_defined_chunking", + "contentDefinedChunking", "metadata_size_hint", "metadataSizeHint", "compression", @@ -6004,14 +6110,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", - "content_defined_chunking", - "contentDefinedChunking", - "cdc_min_chunk_size", - "cdcMinChunkSize", - "cdc_max_chunk_size", - "cdcMaxChunkSize", - "cdc_norm_level", - "cdcNormLevel", ]; #[allow(clippy::enum_variant_names)] @@ -6037,6 +6135,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { DataPageRowCountLimit, MaxRowGroupSize, CreatedBy, + ContentDefinedChunking, MetadataSizeHint, Compression, DictionaryEnabled, @@ -6048,10 +6147,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, - ContentDefinedChunking, - CdcMinChunkSize, - CdcMaxChunkSize, - CdcNormLevel, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6094,6 +6189,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), "metadataSizeHint" | "metadata_size_hint" => Ok(GeneratedField::MetadataSizeHint), "compression" => Ok(GeneratedField::Compression), "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), @@ -6105,10 +6201,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), - "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), - "cdcMinChunkSize" | "cdc_min_chunk_size" => Ok(GeneratedField::CdcMinChunkSize), - "cdcMaxChunkSize" | "cdc_max_chunk_size" => Ok(GeneratedField::CdcMaxChunkSize), - "cdcNormLevel" | "cdc_norm_level" => Ok(GeneratedField::CdcNormLevel), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6149,6 +6241,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; let mut created_by__ = None; + let mut content_defined_chunking__ = None; let mut metadata_size_hint_opt__ = None; let mut compression_opt__ = None; let mut dictionary_enabled_opt__ = None; @@ -6160,10 +6253,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; - let mut content_defined_chunking__ = None; - let mut cdc_min_chunk_size_opt__ = None; - let mut cdc_max_chunk_size_opt__ = None; - let mut cdc_norm_level_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6306,6 +6395,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } created_by__ = Some(map_.next_value()?); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = map_.next_value()?; + } GeneratedField::MetadataSizeHint => { if metadata_size_hint_opt__.is_some() { return Err(serde::de::Error::duplicate_field("metadataSizeHint")); @@ -6372,30 +6467,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } - GeneratedField::ContentDefinedChunking => { - if content_defined_chunking__.is_some() { - return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); - } - content_defined_chunking__ = Some(map_.next_value()?); - } - GeneratedField::CdcMinChunkSize => { - if cdc_min_chunk_size_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcMinChunkSize")); - } - cdc_min_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(x.0)); - } - GeneratedField::CdcMaxChunkSize => { - if cdc_max_chunk_size_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcMaxChunkSize")); - } - cdc_max_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(x.0)); - } - GeneratedField::CdcNormLevel => { - if cdc_norm_level_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcNormLevel")); - } - cdc_norm_level_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcNormLevelOpt::CdcNormLevel(x.0)); - } } } Ok(ParquetOptions { @@ -6420,6 +6491,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), created_by: created_by__.unwrap_or_default(), + content_defined_chunking: content_defined_chunking__, metadata_size_hint_opt: metadata_size_hint_opt__, compression_opt: compression_opt__, dictionary_enabled_opt: dictionary_enabled_opt__, @@ -6431,10 +6503,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, - content_defined_chunking: content_defined_chunking__.unwrap_or_default(), - cdc_min_chunk_size_opt: cdc_min_chunk_size_opt__, - cdc_max_chunk_size_opt: cdc_max_chunk_size_opt__, - cdc_norm_level_opt: cdc_norm_level_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 1ba7fe702665a..1251a51ab0983 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -872,21 +874,6 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, - /// default = false - #[prost(bool, tag = "35")] - pub content_defined_chunking: bool, - #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] - pub cdc_min_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMinChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] - pub cdc_max_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMaxChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] - pub cdc_norm_level_opt: ::core::option::Option< - parquet_options::CdcNormLevelOpt, - >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -945,21 +932,15 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMinChunkSizeOpt { - #[prost(uint64, tag = "36")] - CdcMinChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMaxChunkSizeOpt { - #[prost(uint64, tag = "37")] - CdcMaxChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcNormLevelOpt { - #[prost(int32, tag = "38")] - CdcNormLevel(i32), - } +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 87c39e205ce65..dbe3add2bdb89 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,15 +904,12 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), - content_defined_chunking: value.enable_content_defined_chunking, - cdc_min_chunk_size_opt: value.enable_content_defined_chunking.then(|| - protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(value.cdc_min_chunk_size as u64) - ), - cdc_max_chunk_size_opt: value.enable_content_defined_chunking.then(|| - protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(value.cdc_max_chunk_size as u64) - ), - cdc_norm_level_opt: value.enable_content_defined_chunking.then(|| - protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(value.cdc_norm_level as i32) + content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| + protobuf::CdcOptions { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level as i32, + } ), }) } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 8c9af5ec75584..1251a51ab0983 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,9 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, - /// default = false - #[prost(bool, tag = "35")] - pub content_defined_chunking: bool, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -875,16 +874,6 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, - #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] - pub cdc_min_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMinChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] - pub cdc_max_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMaxChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] - pub cdc_norm_level_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -943,21 +932,15 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMinChunkSizeOpt { - #[prost(uint64, tag = "36")] - CdcMinChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMaxChunkSizeOpt { - #[prost(uint64, tag = "37")] - CdcMaxChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcNormLevelOpt { - #[prost(int32, tag = "38")] - CdcNormLevel(i32), - } +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 4dd6b7b4dd051..4fc8212a73621 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -351,13 +351,13 @@ mod parquet { use super::*; use crate::protobuf::{ - ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, - ParquetOptions as ParquetOptionsProto, + CdcOptions as CdcOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, + ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, TableParquetOptions as TableParquetOptionsProto, parquet_column_options, parquet_options, }; use datafusion_common::config::{ - ParquetColumnOptions, ParquetOptions, TableParquetOptions, + CdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; @@ -426,15 +426,12 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), - content_defined_chunking: global_options.global.enable_content_defined_chunking, - cdc_min_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { - parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(global_options.global.cdc_min_chunk_size as u64) - }), - cdc_max_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { - parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(global_options.global.cdc_max_chunk_size as u64) - }), - cdc_norm_level_opt: global_options.global.enable_content_defined_chunking.then(|| { - parquet_options::CdcNormLevelOpt::CdcNormLevel(global_options.global.cdc_norm_level as i32) + content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { + CdcOptionsProto { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level as i32, + } }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { @@ -535,16 +532,14 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), - enable_content_defined_chunking: proto.content_defined_chunking, - cdc_min_chunk_size: proto.cdc_min_chunk_size_opt.as_ref().map(|opt| match opt { - parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => *v as usize, - }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), - cdc_max_chunk_size: proto.cdc_max_chunk_size_opt.as_ref().map(|opt| match opt { - parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => *v as usize, - }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), - cdc_norm_level: proto.cdc_norm_level_opt.as_ref().map(|opt| match opt { - parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => *v as i64, - }).unwrap_or(ParquetOptions::default().cdc_norm_level), + use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + norm_level: cdc.norm_level as i64, + } + }), } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e36f049da3ba0..1d02ddecec2c5 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -235,9 +235,6 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false -datafusion.execution.parquet.cdc_max_chunk_size 1048576 -datafusion.execution.parquet.cdc_min_chunk_size 262144 -datafusion.execution.parquet.cdc_norm_level 0 datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -246,7 +243,6 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 -datafusion.execution.parquet.enable_content_defined_chunking false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -263,6 +259,7 @@ datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 +datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 @@ -380,9 +377,6 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files -datafusion.execution.parquet.cdc_max_chunk_size 1048576 (writing) Maximum chunk size in bytes for content-defined chunking. The chunker will create a new chunk whenever the chunk size exceeds this value. Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. -datafusion.execution.parquet.cdc_min_chunk_size 262144 (writing) Minimum chunk size in bytes for content-defined chunking. The rolling hash will not be updated until this size is reached for each chunk. Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. -datafusion.execution.parquet.cdc_norm_level 0 (writing) Normalization level for content-defined chunking. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. @@ -391,7 +385,6 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes -datafusion.execution.parquet.enable_content_defined_chunking false (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When true, the other `cdc_*` options control the chunking behavior. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. @@ -408,6 +401,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When Some, CDC is enabled with the given options; when None (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. From 08ede0f36f57eb11d69cf17cdc9f2d23a7c62a85 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Wed, 25 Mar 2026 21:19:51 +0100 Subject: [PATCH 5/6] fix: remove unused parquet dep from proto-common, regenerate configs.md --- Cargo.lock | 1 - datafusion/proto-common/Cargo.toml | 3 +-- docs/source/user-guide/configs.md | 1 + 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54099fbe172fe..1a311bf0941fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2532,7 +2532,6 @@ dependencies = [ "arrow", "datafusion-common", "doc-comment", - "parquet", "pbjson 0.9.0", "prost", "serde", diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 9de3494ec1e21..ca5ee1305c57f 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -37,12 +37,11 @@ name = "datafusion_proto_common" [features] default = [] json = ["serde", "pbjson"] -parquet = ["datafusion-common/parquet", "dep:parquet"] +parquet = ["datafusion-common/parquet"] [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true } -parquet = { workspace = true, optional = true } pbjson = { workspace = true, optional = true } prost = { workspace = true } serde = { version = "1.0", optional = true } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 56ab4d1539f92..5b855f051a6b8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -112,6 +112,7 @@ The following configuration settings are available: | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | From 337b76ba06a5a2483cdf6aa2b42c623a4c05d0fe Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Thu, 26 Mar 2026 09:11:38 +0100 Subject: [PATCH 6/6] fix: use backtick formatting for Some/None in SHOW ALL VERBOSE expected output --- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 1d02ddecec2c5..2491debfdd6f4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -401,7 +401,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting -datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When Some, CDC is enabled with the given options; when None (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.