diff --git a/crates/iceberg/src/catalog/metadata_location.rs b/crates/iceberg/src/catalog/metadata_location.rs index ed28118879..acd041d5e1 100644 --- a/crates/iceberg/src/catalog/metadata_location.rs +++ b/crates/iceberg/src/catalog/metadata_location.rs @@ -114,9 +114,9 @@ impl MetadataLocation { ))?; // Check for compression suffix (e.g., .gz) - let gzip_suffix = CompressionCodec::Gzip.suffix()?; + let gzip_suffix = CompressionCodec::gzip_default().suffix()?; let (stripped, compression_codec) = if let Some(s) = stripped.strip_suffix(gzip_suffix) { - (s, CompressionCodec::Gzip) + (s, CompressionCodec::gzip_default()) } else { (stripped, CompressionCodec::None) }; @@ -261,7 +261,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), - compression_codec: CompressionCodec::Gzip, + compression_codec: CompressionCodec::gzip_default(), }), ), // Negative version @@ -345,10 +345,16 @@ mod test { "/test/table/metadata/00005-81056704-ce5b-41c4-bb83-eb6408081af6.gz.metadata.json", ) .unwrap(); - assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!( + location_gzip.compression_codec, + CompressionCodec::gzip_default() + ); let next_gzip = location_gzip.with_next_version(); - assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!( + next_gzip.compression_codec, + CompressionCodec::gzip_default() + ); assert_eq!(next_gzip.version, 6); } @@ -369,7 +375,10 @@ mod test { ); let metadata_gzip = create_test_metadata(props_gzip); let updated_gzip = location.with_new_metadata(&metadata_gzip); - assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!( + updated_gzip.compression_codec, + CompressionCodec::gzip_default() + ); assert_eq!(updated_gzip.version, 0); assert_eq!( updated_gzip.to_string(), diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index 42f5298437..929d9226e7 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -17,28 +17,101 @@ //! Compression codec support for data compression and decompression. +use std::fmt; use std::io::{Read, Write}; use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::GzEncoder; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{Error, ErrorKind, Result}; +/// Default compression level for Zstandard (zstd). +const ZSTD_DEFAULT_LEVEL: u8 = 3; +/// Default compression level for Gzip. +const GZIP_DEFAULT_LEVEL: u8 = 6; +/// Maximum compression level for Gzip. +const GZIP_MAX_LEVEL: u8 = 9; + /// Data compression formats -#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum CompressionCodec { #[default] /// No compression None, /// LZ4 single compression frame with content size present Lz4, - /// Zstandard single compression frame with content size present - Zstd, - /// Gzip compression - Gzip, + /// Zstandard single compression frame with content size present. + /// Level range is 0–22, where 0 means default compression level (not no compression). + /// Use [`CompressionCodec::zstd_default`] to construct with the default level. + Zstd(u8), + /// Gzip compression. Level range is 0–9, where 0 means no compression. + /// Use [`CompressionCodec::gzip_default`] to construct with the default level. + Gzip(u8), + /// Snappy compression + Snappy, +} + +impl CompressionCodec { + /// Returns a Zstd codec with the default compression level. + pub const fn zstd_default() -> Self { + CompressionCodec::Zstd(ZSTD_DEFAULT_LEVEL) + } + + /// Returns a Gzip codec with the default compression level. + pub const fn gzip_default() -> Self { + CompressionCodec::Gzip(GZIP_DEFAULT_LEVEL) + } + + /// Returns the codec name as used in serialization and error messages. + pub fn name(&self) -> &'static str { + match self { + CompressionCodec::None => "none", + CompressionCodec::Lz4 => "lz4", + CompressionCodec::Zstd(_) => "zstd", + CompressionCodec::Gzip(_) => "gzip", + CompressionCodec::Snappy => "snappy", + } + } +} + +// Note: serialize/deserialize do not round-trip the compression level. Iceberg configuration +// only the codec name (e.g. "zstd"), not the level, so deserialization always produces the +// default level. A `Zstd(5)` written to metadata will be read back as `Zstd(3)`. Some +// compression configuration (e.g. Avro metadata) has a separate level field alongside the codec name. +impl Serialize for CompressionCodec { + fn serialize(&self, serializer: S) -> std::result::Result { + serializer.serialize_str(self.name()) + } +} + +impl<'de> Deserialize<'de> for CompressionCodec { + fn deserialize>(deserializer: D) -> std::result::Result { + let s = String::deserialize(deserializer)?; + match s.to_lowercase().as_str() { + "none" => Ok(CompressionCodec::None), + "lz4" => Ok(CompressionCodec::Lz4), + "zstd" => Ok(CompressionCodec::zstd_default()), + "gzip" => Ok(CompressionCodec::gzip_default()), + "snappy" => Ok(CompressionCodec::Snappy), + other => Err(serde::de::Error::unknown_variant(other, &[ + "none", "lz4", "zstd", "gzip", "snappy", + ])), + } + } +} + +impl fmt::Display for CompressionCodec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CompressionCodec::None => write!(f, "None"), + CompressionCodec::Lz4 => write!(f, "Lz4"), + CompressionCodec::Zstd(level) => write!(f, "Zstd(level={level})"), + CompressionCodec::Gzip(level) => write!(f, "Gzip(level={level})"), + CompressionCodec::Snappy => write!(f, "Snappy"), + } + } } impl CompressionCodec { @@ -49,13 +122,17 @@ impl CompressionCodec { ErrorKind::FeatureUnsupported, "LZ4 decompression is not supported currently", )), - CompressionCodec::Zstd => Ok(zstd::stream::decode_all(&bytes[..])?), - CompressionCodec::Gzip => { + CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?), + CompressionCodec::Gzip(_) => { let mut decoder = GzDecoder::new(&bytes[..]); let mut decompressed = Vec::new(); decoder.read_to_end(&mut decompressed)?; Ok(decompressed) } + CompressionCodec::Snappy => Err(Error::new( + ErrorKind::FeatureUnsupported, + "Snappy decompression is not supported currently", + )), } } @@ -66,19 +143,24 @@ impl CompressionCodec { ErrorKind::FeatureUnsupported, "LZ4 compression is not supported currently", )), - CompressionCodec::Zstd => { + CompressionCodec::Zstd(level) => { let writer = Vec::::new(); - let mut encoder = zstd::stream::Encoder::new(writer, 3)?; + let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?; encoder.include_checksum(true)?; encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?; std::io::copy(&mut &bytes[..], &mut encoder)?; Ok(encoder.finish()?) } - CompressionCodec::Gzip => { - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + CompressionCodec::Gzip(level) => { + let compression = Compression::new((*level).min(GZIP_MAX_LEVEL) as u32); + let mut encoder = GzEncoder::new(Vec::new(), compression); encoder.write_all(&bytes)?; Ok(encoder.finish()?) } + CompressionCodec::Snappy => Err(Error::new( + ErrorKind::FeatureUnsupported, + "Snappy compression is not supported currently", + )), } } @@ -95,8 +177,10 @@ impl CompressionCodec { pub fn suffix(&self) -> Result<&'static str> { match self { CompressionCodec::None => Ok(""), - CompressionCodec::Gzip => Ok(".gz"), - codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd) => Err(Error::new( + CompressionCodec::Gzip(_) => Ok(".gz"), + codec @ (CompressionCodec::Lz4 + | CompressionCodec::Zstd(_) + | CompressionCodec::Snappy) => Err(Error::new( ErrorKind::FeatureUnsupported, format!("suffix not defined for {codec:?}"), )), @@ -123,7 +207,10 @@ mod tests { async fn test_compression_codec_compress() { let bytes_vec = [0_u8; 100].to_vec(); - let compression_codecs = [CompressionCodec::Zstd, CompressionCodec::Gzip]; + let compression_codecs = [ + CompressionCodec::zstd_default(), + CompressionCodec::gzip_default(), + ]; for codec in compression_codecs { let compressed = codec.compress(bytes_vec.clone()).unwrap(); @@ -135,7 +222,10 @@ mod tests { #[tokio::test] async fn test_compression_codec_unsupported() { - let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")]; + let unsupported_codecs = [ + (CompressionCodec::Lz4, "LZ4"), + (CompressionCodec::Snappy, "Snappy"), + ]; let bytes_vec = [0_u8; 100].to_vec(); for (codec, name) in unsupported_codecs { @@ -153,18 +243,34 @@ mod tests { #[test] fn test_suffix() { - // Test supported codecs assert_eq!(CompressionCodec::None.suffix().unwrap(), ""); - assert_eq!(CompressionCodec::Gzip.suffix().unwrap(), ".gz"); + assert_eq!(CompressionCodec::gzip_default().suffix().unwrap(), ".gz"); - // Test unsupported codecs return errors assert!(CompressionCodec::Lz4.suffix().is_err()); - assert!(CompressionCodec::Zstd.suffix().is_err()); + assert!(CompressionCodec::zstd_default().suffix().is_err()); + assert!(CompressionCodec::Snappy.suffix().is_err()); let lz4_err = CompressionCodec::Lz4.suffix().unwrap_err(); assert!(lz4_err.to_string().contains("suffix not defined for Lz4")); - let zstd_err = CompressionCodec::Zstd.suffix().unwrap_err(); + let zstd_err = CompressionCodec::zstd_default().suffix().unwrap_err(); assert!(zstd_err.to_string().contains("suffix not defined for Zstd")); } + + #[test] + fn test_display() { + assert_eq!(CompressionCodec::None.to_string(), "None"); + assert_eq!(CompressionCodec::Lz4.to_string(), "Lz4"); + assert_eq!( + CompressionCodec::zstd_default().to_string(), + "Zstd(level=3)" + ); + assert_eq!(CompressionCodec::Zstd(5).to_string(), "Zstd(level=5)"); + assert_eq!( + CompressionCodec::gzip_default().to_string(), + "Gzip(level=6)" + ); + assert_eq!(CompressionCodec::Gzip(9).to_string(), "Gzip(level=9)"); + assert_eq!(CompressionCodec::Snappy.to_string(), "Snappy"); + } } diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 1d39cf249b..e2dfc10c23 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -985,6 +985,9 @@ mod tests { assert!(result.is_ok()); let metadata = result.unwrap(); assert_eq!(metadata.blobs.len(), 1); - assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip); + assert_eq!( + metadata.blobs[0].compression_codec, + CompressionCodec::gzip_default() + ); } } diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 854d4070ff..0e054cac51 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -26,30 +26,22 @@ pub use blob::{APACHE_DATASKETCHES_THETA_V1, Blob, DELETION_VECTOR_V1}; pub use crate::compression::CompressionCodec; -/// Compression codecs supported by the Puffin spec. -const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[ - CompressionCodec::None, - CompressionCodec::Lz4, - CompressionCodec::Zstd, -]; - /// Validates that the compression codec is supported for Puffin files. /// Returns an error if the codec is not supported. fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> { - if !SUPPORTED_PUFFIN_CODECS.contains(&codec) { - let supported_names: Vec = SUPPORTED_PUFFIN_CODECS - .iter() - .map(|c| format!("{c:?}")) - .collect(); - return Err(Error::new( + match codec { + CompressionCodec::None | CompressionCodec::Lz4 | CompressionCodec::Zstd(_) => Ok(()), + other => Err(Error::new( ErrorKind::DataInvalid, format!( - "Compression codec {codec:?} is not supported for Puffin files. Only {} are supported.", - supported_names.join(", ") + "Compression codec {} is not supported for Puffin files. Only {}, {}, and {} are supported.", + other.name(), + CompressionCodec::None.name(), + CompressionCodec::Lz4.name(), + CompressionCodec::zstd_default().name() ), - )); + )), } - Ok(()) } mod metadata; @@ -70,12 +62,13 @@ mod tests { #[test] fn test_puffin_codec_validation() { - // All codecs in SUPPORTED_PUFFIN_CODECS should be valid - for codec in SUPPORTED_PUFFIN_CODECS { - assert!(validate_puffin_compression(*codec).is_ok()); - } + // Supported codecs + assert!(validate_puffin_compression(CompressionCodec::None).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::zstd_default()).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Zstd(5)).is_ok()); - // Gzip should not be supported for Puffin files - assert!(validate_puffin_compression(CompressionCodec::Gzip).is_err()); + // Unsupported codecs + assert!(validate_puffin_compression(CompressionCodec::gzip_default()).is_err()); } } diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index d272f02d41..0aced4186f 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -144,7 +144,7 @@ mod tests { sequence_number: 1, offset: 4, length: 10, - compression_codec: CompressionCodec::Gzip, + compression_codec: CompressionCodec::gzip_default(), properties: HashMap::new(), }; @@ -153,7 +153,7 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("Gzip")); + assert!(err.to_string().contains("gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs index 39fecc6f80..e0844e2002 100644 --- a/crates/iceberg/src/puffin/test_utils.rs +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -77,7 +77,7 @@ pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, offset: 4, length: 22, - compression_codec: CompressionCodec::Zstd, + compression_codec: CompressionCodec::zstd_default(), properties: HashMap::new(), } } @@ -134,7 +134,7 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, offset: 26, length: 77, - compression_codec: CompressionCodec::Zstd, + compression_codec: CompressionCodec::zstd_default(), properties: HashMap::new(), } } diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 30b97f09dd..4af4970b04 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -251,7 +251,8 @@ mod tests { async fn test_write_zstd_compressed_metric_data() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; - let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd); + let blobs_with_compression = + blobs_with_compression(blobs.clone(), CompressionCodec::zstd_default()); let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) .await @@ -323,7 +324,8 @@ mod tests { async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; - let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd); + let blobs_with_compression = + blobs_with_compression(blobs, CompressionCodec::zstd_default()); assert_files_are_bit_identical( write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) @@ -338,14 +340,15 @@ mod tests { async fn test_gzip_compression_rejected() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0()]; - let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Gzip); + let blobs_with_compression = + blobs_with_compression(blobs, CompressionCodec::gzip_default()); let result = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()).await; assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("Gzip")); + assert!(err.to_string().contains("gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index b91599b74f..524139d434 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -457,7 +457,7 @@ impl TableMetadata { && metadata_content[0] == 0x1F && metadata_content[1] == 0x8B { - let decompressed_data = CompressionCodec::Gzip + let decompressed_data = CompressionCodec::gzip_default() .decompress(metadata_content.to_vec()) .map_err(|e| { Error::new( @@ -499,7 +499,7 @@ impl TableMetadata { // Apply compression based on codec let data_to_write = match codec { - CompressionCodec::Gzip => codec.compress(json_data)?, + CompressionCodec::Gzip(_) => codec.compress(json_data)?, CompressionCodec::None => json_data, _ => { return Err(Error::new( @@ -3618,7 +3618,7 @@ mod tests { let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); let json = serde_json::to_string(&original_metadata).unwrap(); - let compressed = CompressionCodec::Gzip + let compressed = CompressionCodec::gzip_default() .compress(json.into_bytes()) .expect("failed to compress metadata"); std::fs::write(&metadata_location, &compressed).expect("failed to write metadata"); diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 07c157304e..a3d4e7fdaa 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -78,18 +78,22 @@ pub(crate) fn parse_metadata_file_compression( Error::new( ErrorKind::DataInvalid, format!( - "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported." + "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.", + CompressionCodec::None.name(), + CompressionCodec::gzip_default().name() ), ) })?; // Validate that only None and Gzip are used for metadata match codec { - CompressionCodec::None | CompressionCodec::Gzip => Ok(codec), - CompressionCodec::Lz4 | CompressionCodec::Zstd => Err(Error::new( + CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec), + _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." + "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.", + CompressionCodec::None.name(), + CompressionCodec::gzip_default().name() ), )), } @@ -324,7 +328,7 @@ mod tests { let table_properties = TableProperties::try_from(&props).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip + CompressionCodec::gzip_default() ); } @@ -351,7 +355,7 @@ mod tests { let table_properties = TableProperties::try_from(&props_upper).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip + CompressionCodec::gzip_default() ); // Test mixed case @@ -362,7 +366,7 @@ mod tests { let table_properties = TableProperties::try_from(&props_mixed).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip + CompressionCodec::gzip_default() ); // Test "NONE" should also be case-insensitive @@ -517,7 +521,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip + CompressionCodec::gzip_default() ); // Test case insensitivity - "NONE" @@ -537,7 +541,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip + CompressionCodec::gzip_default() ); // Test case insensitivity - "GzIp" @@ -547,7 +551,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip + CompressionCodec::gzip_default() ); // Test default when property is missing