diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d5fbfc50a21..7dd433cdc7001 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -687,6 +687,24 @@ config_namespace! { } } +config_namespace! { + /// Options for content-defined chunking (CDC) when writing parquet files. + /// See [`ParquetOptions::use_content_defined_chunking`]. + pub struct CdcOptions { + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, default = 256 * 1024 + + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, default = 1024 * 1024 + + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i64, default = 0 + } +} + config_namespace! { /// Options for reading and writing parquet files /// @@ -845,6 +863,7 @@ config_namespace! { /// default parquet writer setting pub bloom_filter_ndv: Option, default = None + /// (writing) Controls whether DataFusion will attempt to speed up writing /// parquet files by serializing them in parallel. Each column /// in each row group in each output file are serialized in parallel @@ -872,6 +891,12 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing + /// parquet files. When `Some`, CDC is enabled with the given options; when `None` + /// (the default), CDC is disabled. When CDC is enabled, parallel writing is + /// automatically disabled since the chunker state must persist across row groups. + pub use_content_defined_chunking: Option, default = None } } @@ -1820,6 +1845,7 @@ config_field!(usize); config_field!(f64); config_field!(u64); config_field!(u32); +config_field!(i64); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a7a1fc6d0bb66..103719b21315d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, - crypto: _, + .. } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; @@ -191,6 +191,7 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, + use_content_defined_chunking, // not in WriterProperties enable_page_index: _, @@ -247,6 +248,15 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } + if let Some(cdc) = use_content_defined_chunking { + builder = builder.set_content_defined_chunking(Some( + parquet::file::properties::CdcOptions { + min_chunk_size: cdc.min_chunk_size, + max_chunk_size: cdc.max_chunk_size, + norm_level: cdc.norm_level as i32, + }, + )); + } Ok(builder) } @@ -388,7 +398,9 @@ mod tests { use super::*; #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; - use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::config::{ + CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions, + }; use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ @@ -460,6 +472,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), } } @@ -576,6 +589,13 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + use_content_defined_chunking: props.content_defined_chunking().map(|c| { + CdcOptions { + min_chunk_size: c.min_chunk_size, + max_chunk_size: c.max_chunk_size, + norm_level: c.norm_level as i64, + } + }), }, column_specific_options, key_value_metadata, @@ -786,6 +806,51 @@ mod tests { ); } + #[test] + fn test_cdc_enabled_with_custom_options() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let cdc = props.content_defined_chunking().expect("CDC should be set"); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_cdc_disabled_by_default() { + let mut opts = TableParquetOptions::default(); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert!(props.content_defined_chunking().is_none()); + } + + #[test] + fn test_cdc_round_trip_through_writer_props() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let recovered = session_config_from_writer_props(&props); + + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c391b299d4a29..83b719d576b02 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1368,7 +1368,11 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path).await?; - if !parquet_opts.global.allow_single_file_parallelism { + // CDC requires the sequential writer: the chunker state lives in ArrowWriter + // and persists across row groups. The parallel path bypasses ArrowWriter entirely. + if !parquet_opts.global.allow_single_file_parallelism + || parquet_opts.global.use_content_defined_chunking.is_some() + { let mut writer = self .create_async_arrow_writer( &path, diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 46dae36ba40ed..ca5ee1305c57f 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -37,6 +37,7 @@ name = "datafusion_proto_common" [features] default = [] json = ["serde", "pbjson"] +parquet = ["datafusion-common/parquet"] [dependencies] arrow = { workspace = true } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..31ece63577b4f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -603,6 +603,14 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + CdcOptions content_defined_chunking = 35; +} + +message CdcOptions { + uint64 min_chunk_size = 1; + uint64 max_chunk_size = 2; + int32 norm_level = 3; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..493bd359a62b5 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -40,7 +40,7 @@ use datafusion_common::{ DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, arrow_datafusion_err, config::{ - CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, + CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -1090,6 +1090,14 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + norm_level: cdc.norm_level as i64, + } + }), }) } } @@ -1152,7 +1160,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { column_specific_options.insert(column_name.clone(), options.try_into()?); } } - Ok(TableParquetOptions { + let opts = TableParquetOptions { global: value .global .as_ref() @@ -1160,9 +1168,9 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, - key_value_metadata: Default::default(), - crypto: Default::default(), - }) + ..Default::default() + }; + Ok(opts) } } @@ -1262,3 +1270,88 @@ pub(crate) fn csv_writer_options_from_proto( .with_null(writer_options.null_value.clone()) .with_double_quote(writer_options.double_quote)) } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; + + fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { + let proto: crate::protobuf_common::ParquetOptions = + (&opts).try_into().expect("to_proto"); + ParquetOptions::try_from(&proto).expect("from_proto") + } + + fn table_parquet_options_proto_round_trip( + opts: TableParquetOptions, + ) -> TableParquetOptions { + let proto: crate::protobuf_common::TableParquetOptions = + (&opts).try_into().expect("to_proto"); + TableParquetOptions::try_from(&proto).expect("from_proto") + } + + #[test] + fn test_parquet_options_cdc_disabled_round_trip() { + let opts = ParquetOptions::default(); + assert!(opts.use_content_defined_chunking.is_none()); + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(opts, recovered); + } + + #[test] + fn test_parquet_options_cdc_enabled_round_trip() { + let opts = ParquetOptions { + use_content_defined_chunking: Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + let cdc = recovered.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_parquet_options_cdc_negative_norm_level_round_trip() { + let opts = ParquetOptions { + use_content_defined_chunking: Some(CdcOptions { + norm_level: -3, + ..CdcOptions::default() + }), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts); + assert_eq!( + recovered.use_content_defined_chunking.unwrap().norm_level, + -3 + ); + } + + #[test] + fn test_table_parquet_options_cdc_round_trip() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); + + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); + } + + #[test] + fn test_table_parquet_options_cdc_disabled_round_trip() { + let opts = TableParquetOptions::default(); + assert!(opts.global.use_content_defined_chunking.is_none()); + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert!(recovered.global.use_content_defined_chunking.is_none()); + } +} diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..77a3b71488ece 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -883,6 +883,144 @@ impl<'de> serde::Deserialize<'de> for AvroOptions { deserializer.deserialize_struct("datafusion_common.AvroOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CdcOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.min_chunk_size != 0 { + len += 1; + } + if self.max_chunk_size != 0 { + len += 1; + } + if self.norm_level != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.CdcOptions", len)?; + if self.min_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("minChunkSize", ToString::to_string(&self.min_chunk_size).as_str())?; + } + if self.max_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxChunkSize", ToString::to_string(&self.max_chunk_size).as_str())?; + } + if self.norm_level != 0 { + struct_ser.serialize_field("normLevel", &self.norm_level)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CdcOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "min_chunk_size", + "minChunkSize", + "max_chunk_size", + "maxChunkSize", + "norm_level", + "normLevel", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + MinChunkSize, + MaxChunkSize, + NormLevel, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "minChunkSize" | "min_chunk_size" => Ok(GeneratedField::MinChunkSize), + "maxChunkSize" | "max_chunk_size" => Ok(GeneratedField::MaxChunkSize), + "normLevel" | "norm_level" => Ok(GeneratedField::NormLevel), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CdcOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.CdcOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut min_chunk_size__ = None; + let mut max_chunk_size__ = None; + let mut norm_level__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::MinChunkSize => { + if min_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("minChunkSize")); + } + min_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::MaxChunkSize => { + if max_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("maxChunkSize")); + } + max_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::NormLevel => { + if norm_level__.is_some() { + return Err(serde::de::Error::duplicate_field("normLevel")); + } + norm_level__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(CdcOptions { + min_chunk_size: min_chunk_size__.unwrap_or_default(), + max_chunk_size: max_chunk_size__.unwrap_or_default(), + norm_level: norm_level__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.CdcOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for Column { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -5695,6 +5833,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { len += 1; } + if self.content_defined_chunking.is_some() { + len += 1; + } if self.metadata_size_hint_opt.is_some() { len += 1; } @@ -5806,6 +5947,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { struct_ser.serialize_field("createdBy", &self.created_by)?; } + if let Some(v) = self.content_defined_chunking.as_ref() { + struct_ser.serialize_field("contentDefinedChunking", v)?; + } if let Some(v) = self.metadata_size_hint_opt.as_ref() { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { @@ -5944,6 +6088,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxRowGroupSize", "created_by", "createdBy", + "content_defined_chunking", + "contentDefinedChunking", "metadata_size_hint", "metadataSizeHint", "compression", @@ -5989,6 +6135,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { DataPageRowCountLimit, MaxRowGroupSize, CreatedBy, + ContentDefinedChunking, MetadataSizeHint, Compression, DictionaryEnabled, @@ -6042,6 +6189,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), "metadataSizeHint" | "metadata_size_hint" => Ok(GeneratedField::MetadataSizeHint), "compression" => Ok(GeneratedField::Compression), "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), @@ -6093,6 +6241,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; let mut created_by__ = None; + let mut content_defined_chunking__ = None; let mut metadata_size_hint_opt__ = None; let mut compression_opt__ = None; let mut dictionary_enabled_opt__ = None; @@ -6246,6 +6395,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } created_by__ = Some(map_.next_value()?); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = map_.next_value()?; + } GeneratedField::MetadataSizeHint => { if metadata_size_hint_opt__.is_some() { return Err(serde::de::Error::duplicate_field("metadataSizeHint")); @@ -6336,6 +6491,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), created_by: created_by__.unwrap_or_default(), + content_defined_chunking: content_defined_chunking__, metadata_size_hint_opt: metadata_size_hint_opt__, compression_opt: compression_opt__, dictionary_enabled_opt: dictionary_enabled_opt__, diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..1251a51ab0983 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -931,6 +933,15 @@ pub mod parquet_options { MaxPredicateCacheSize(u64), } } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { #[prost(enumeration = "PrecisionInfo", tag = "1")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..dbe3add2bdb89 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,13 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| + protobuf::CdcOptions { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level as i32, + } + ), }) } } @@ -963,8 +970,11 @@ impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions { .iter() .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) .collect::>(); + + let global: protobuf::ParquetOptions = (&value.global).try_into()?; + Ok(protobuf::TableParquetOptions { - global: Some((&value.global).try_into()?), + global: Some(global), column_specific_options, key_value_metadata, }) diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..1251a51ab0983 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -931,6 +933,15 @@ pub mod parquet_options { MaxPredicateCacheSize(u64), } } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { #[prost(enumeration = "PrecisionInfo", tag = "1")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..4fc8212a73621 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -351,13 +351,13 @@ mod parquet { use super::*; use crate::protobuf::{ - ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, - ParquetOptions as ParquetOptionsProto, + CdcOptions as CdcOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, + ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, TableParquetOptions as TableParquetOptionsProto, parquet_column_options, parquet_options, }; use datafusion_common::config::{ - ParquetColumnOptions, ParquetOptions, TableParquetOptions, + CdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; @@ -426,6 +426,13 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { + CdcOptionsProto { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level as i32, + } + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,6 +532,14 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + norm_level: cdc.norm_level as i64, + } + }), } } } @@ -585,7 +600,7 @@ mod parquet { .iter() .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), - crypto: Default::default(), + ..Default::default() } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2da823421dd76..2491debfdd6f4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -259,6 +259,7 @@ datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 +datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 @@ -400,6 +401,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 56ab4d1539f92..5b855f051a6b8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -112,6 +112,7 @@ The following configuration settings are available: | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |