Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions crates/iceberg/src/catalog/metadata_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ impl MetadataLocation {
))?;

// Check for compression suffix (e.g., .gz)
let gzip_suffix = CompressionCodec::Gzip.suffix()?;
let gzip_suffix = CompressionCodec::gzip_default().suffix()?;
let (stripped, compression_codec) = if let Some(s) = stripped.strip_suffix(gzip_suffix) {
(s, CompressionCodec::Gzip)
(s, CompressionCodec::gzip_default())
} else {
(stripped, CompressionCodec::None)
};
Expand Down Expand Up @@ -261,7 +261,7 @@ mod test {
table_location: "/abc".to_string(),
version: 1234567,
id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(),
compression_codec: CompressionCodec::Gzip,
compression_codec: CompressionCodec::gzip_default(),
}),
),
// Negative version
Expand Down Expand Up @@ -345,10 +345,16 @@ mod test {
"/test/table/metadata/00005-81056704-ce5b-41c4-bb83-eb6408081af6.gz.metadata.json",
)
.unwrap();
assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip);
assert_eq!(
location_gzip.compression_codec,
CompressionCodec::gzip_default()
);

let next_gzip = location_gzip.with_next_version();
assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip);
assert_eq!(
next_gzip.compression_codec,
CompressionCodec::gzip_default()
);
assert_eq!(next_gzip.version, 6);
}

Expand All @@ -369,7 +375,10 @@ mod test {
);
let metadata_gzip = create_test_metadata(props_gzip);
let updated_gzip = location.with_new_metadata(&metadata_gzip);
assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip);
assert_eq!(
updated_gzip.compression_codec,
CompressionCodec::gzip_default()
);
assert_eq!(updated_gzip.version, 0);
assert_eq!(
updated_gzip.to_string(),
Expand Down
150 changes: 128 additions & 22 deletions crates/iceberg/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,101 @@

//! Compression codec support for data compression and decompression.

use std::fmt;
use std::io::{Read, Write};

use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

use crate::{Error, ErrorKind, Result};

/// Default compression level for Zstandard (zstd).
const ZSTD_DEFAULT_LEVEL: u8 = 3;
/// Default compression level for Gzip.
const GZIP_DEFAULT_LEVEL: u8 = 6;
/// Maximum compression level for Gzip.
const GZIP_MAX_LEVEL: u8 = 9;

/// Data compression formats
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
pub enum CompressionCodec {
#[default]
/// No compression
None,
/// LZ4 single compression frame with content size present
Lz4,
/// Zstandard single compression frame with content size present
Zstd,
/// Gzip compression
Gzip,
/// Zstandard single compression frame with content size present.
/// Level range is 0–22, where 0 means default compression level (not no compression).
/// Use [`CompressionCodec::zstd_default`] to construct with the default level.
Zstd(u8),
/// Gzip compression. Level range is 0–9, where 0 means no compression.
/// Use [`CompressionCodec::gzip_default`] to construct with the default level.
Gzip(u8),
/// Snappy compression
Snappy,
}

impl CompressionCodec {
/// Returns a Zstd codec with the default compression level.
pub const fn zstd_default() -> Self {
CompressionCodec::Zstd(ZSTD_DEFAULT_LEVEL)
}

/// Returns a Gzip codec with the default compression level.
pub const fn gzip_default() -> Self {
CompressionCodec::Gzip(GZIP_DEFAULT_LEVEL)
}

/// Returns the codec name as used in serialization and error messages.
pub fn name(&self) -> &'static str {
match self {
CompressionCodec::None => "none",
CompressionCodec::Lz4 => "lz4",
CompressionCodec::Zstd(_) => "zstd",
CompressionCodec::Gzip(_) => "gzip",
CompressionCodec::Snappy => "snappy",
}
}
}

// Note: serialize/deserialize do not round-trip the compression level. Iceberg configuration
// only the codec name (e.g. "zstd"), not the level, so deserialization always produces the
// default level. A `Zstd(5)` written to metadata will be read back as `Zstd(3)`. Some
// compression configuration (e.g. Avro metadata) has a separate level field alongside the codec name.
impl Serialize for CompressionCodec {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
serializer.serialize_str(self.name())
}
}

impl<'de> Deserialize<'de> for CompressionCodec {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
match s.to_lowercase().as_str() {
"none" => Ok(CompressionCodec::None),
"lz4" => Ok(CompressionCodec::Lz4),
"zstd" => Ok(CompressionCodec::zstd_default()),
"gzip" => Ok(CompressionCodec::gzip_default()),
"snappy" => Ok(CompressionCodec::Snappy),
other => Err(serde::de::Error::unknown_variant(other, &[
"none", "lz4", "zstd", "gzip", "snappy",
])),
}
}
}

impl fmt::Display for CompressionCodec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CompressionCodec::None => write!(f, "None"),
CompressionCodec::Lz4 => write!(f, "Lz4"),
CompressionCodec::Zstd(level) => write!(f, "Zstd(level={level})"),
CompressionCodec::Gzip(level) => write!(f, "Gzip(level={level})"),
CompressionCodec::Snappy => write!(f, "Snappy"),
}
}
}

impl CompressionCodec {
Expand All @@ -49,13 +122,17 @@ impl CompressionCodec {
ErrorKind::FeatureUnsupported,
"LZ4 decompression is not supported currently",
)),
CompressionCodec::Zstd => Ok(zstd::stream::decode_all(&bytes[..])?),
CompressionCodec::Gzip => {
CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?),
CompressionCodec::Gzip(_) => {
let mut decoder = GzDecoder::new(&bytes[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
CompressionCodec::Snappy => Err(Error::new(
ErrorKind::FeatureUnsupported,
"Snappy decompression is not supported currently",
)),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to address this in a follow up PR? If so, could you create a tracking issue?

Copy link
Copy Markdown
Contributor Author

@emkornfield emkornfield Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know the only place Snappy is used is within Avro, and Avro has its own code for doing the compression.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense

}
}

Expand All @@ -66,19 +143,24 @@ impl CompressionCodec {
ErrorKind::FeatureUnsupported,
"LZ4 compression is not supported currently",
)),
CompressionCodec::Zstd => {
CompressionCodec::Zstd(level) => {
let writer = Vec::<u8>::new();
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?;
encoder.include_checksum(true)?;
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
std::io::copy(&mut &bytes[..], &mut encoder)?;
Ok(encoder.finish()?)
}
CompressionCodec::Gzip => {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
CompressionCodec::Gzip(level) => {
let compression = Compression::new((*level).min(GZIP_MAX_LEVEL) as u32);
let mut encoder = GzEncoder::new(Vec::new(), compression);
encoder.write_all(&bytes)?;
Ok(encoder.finish()?)
}
CompressionCodec::Snappy => Err(Error::new(
ErrorKind::FeatureUnsupported,
"Snappy compression is not supported currently",
)),
}
}

Expand All @@ -95,8 +177,10 @@ impl CompressionCodec {
pub fn suffix(&self) -> Result<&'static str> {
match self {
CompressionCodec::None => Ok(""),
CompressionCodec::Gzip => Ok(".gz"),
codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd) => Err(Error::new(
CompressionCodec::Gzip(_) => Ok(".gz"),
codec @ (CompressionCodec::Lz4
| CompressionCodec::Zstd(_)
| CompressionCodec::Snappy) => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("suffix not defined for {codec:?}"),
)),
Expand All @@ -123,7 +207,10 @@ mod tests {
async fn test_compression_codec_compress() {
let bytes_vec = [0_u8; 100].to_vec();

let compression_codecs = [CompressionCodec::Zstd, CompressionCodec::Gzip];
let compression_codecs = [
CompressionCodec::zstd_default(),
CompressionCodec::gzip_default(),
];

for codec in compression_codecs {
let compressed = codec.compress(bytes_vec.clone()).unwrap();
Expand All @@ -135,7 +222,10 @@ mod tests {

#[tokio::test]
async fn test_compression_codec_unsupported() {
let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")];
let unsupported_codecs = [
(CompressionCodec::Lz4, "LZ4"),
(CompressionCodec::Snappy, "Snappy"),
];
let bytes_vec = [0_u8; 100].to_vec();

for (codec, name) in unsupported_codecs {
Expand All @@ -153,18 +243,34 @@ mod tests {

#[test]
fn test_suffix() {
// Test supported codecs
assert_eq!(CompressionCodec::None.suffix().unwrap(), "");
assert_eq!(CompressionCodec::Gzip.suffix().unwrap(), ".gz");
assert_eq!(CompressionCodec::gzip_default().suffix().unwrap(), ".gz");

// Test unsupported codecs return errors
assert!(CompressionCodec::Lz4.suffix().is_err());
assert!(CompressionCodec::Zstd.suffix().is_err());
assert!(CompressionCodec::zstd_default().suffix().is_err());
assert!(CompressionCodec::Snappy.suffix().is_err());

let lz4_err = CompressionCodec::Lz4.suffix().unwrap_err();
assert!(lz4_err.to_string().contains("suffix not defined for Lz4"));

let zstd_err = CompressionCodec::Zstd.suffix().unwrap_err();
let zstd_err = CompressionCodec::zstd_default().suffix().unwrap_err();
assert!(zstd_err.to_string().contains("suffix not defined for Zstd"));
}

#[test]
fn test_display() {
assert_eq!(CompressionCodec::None.to_string(), "None");
assert_eq!(CompressionCodec::Lz4.to_string(), "Lz4");
assert_eq!(
CompressionCodec::zstd_default().to_string(),
"Zstd(level=3)"
);
assert_eq!(CompressionCodec::Zstd(5).to_string(), "Zstd(level=5)");
assert_eq!(
CompressionCodec::gzip_default().to_string(),
"Gzip(level=6)"
);
assert_eq!(CompressionCodec::Gzip(9).to_string(), "Gzip(level=9)");
assert_eq!(CompressionCodec::Snappy.to_string(), "Snappy");
}
}
5 changes: 4 additions & 1 deletion crates/iceberg/src/puffin/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,9 @@ mod tests {
assert!(result.is_ok());
let metadata = result.unwrap();
assert_eq!(metadata.blobs.len(), 1);
assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip);
assert_eq!(
metadata.blobs[0].compression_codec,
CompressionCodec::gzip_default()
);
}
}
39 changes: 16 additions & 23 deletions crates/iceberg/src/puffin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,22 @@ pub use blob::{APACHE_DATASKETCHES_THETA_V1, Blob, DELETION_VECTOR_V1};

pub use crate::compression::CompressionCodec;

/// Compression codecs supported by the Puffin spec.
const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[
CompressionCodec::None,
CompressionCodec::Lz4,
CompressionCodec::Zstd,
];

/// Validates that the compression codec is supported for Puffin files.
/// Returns an error if the codec is not supported.
fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> {
if !SUPPORTED_PUFFIN_CODECS.contains(&codec) {
let supported_names: Vec<String> = SUPPORTED_PUFFIN_CODECS
.iter()
.map(|c| format!("{c:?}"))
.collect();
return Err(Error::new(
match codec {
CompressionCodec::None | CompressionCodec::Lz4 | CompressionCodec::Zstd(_) => Ok(()),
other => Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Compression codec {codec:?} is not supported for Puffin files. Only {} are supported.",
supported_names.join(", ")
"Compression codec {} is not supported for Puffin files. Only {}, {}, and {} are supported.",
other.name(),
CompressionCodec::None.name(),
CompressionCodec::Lz4.name(),
CompressionCodec::zstd_default().name()
),
));
)),
}
Ok(())
}

mod metadata;
Expand All @@ -70,12 +62,13 @@ mod tests {

#[test]
fn test_puffin_codec_validation() {
// All codecs in SUPPORTED_PUFFIN_CODECS should be valid
for codec in SUPPORTED_PUFFIN_CODECS {
assert!(validate_puffin_compression(*codec).is_ok());
}
// Supported codecs
assert!(validate_puffin_compression(CompressionCodec::None).is_ok());
assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok());
assert!(validate_puffin_compression(CompressionCodec::zstd_default()).is_ok());
assert!(validate_puffin_compression(CompressionCodec::Zstd(5)).is_ok());

// Gzip should not be supported for Puffin files
assert!(validate_puffin_compression(CompressionCodec::Gzip).is_err());
// Unsupported codecs
assert!(validate_puffin_compression(CompressionCodec::gzip_default()).is_err());
}
}
4 changes: 2 additions & 2 deletions crates/iceberg/src/puffin/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ mod tests {
sequence_number: 1,
offset: 4,
length: 10,
compression_codec: CompressionCodec::Gzip,
compression_codec: CompressionCodec::gzip_default(),
properties: HashMap::new(),
};

Expand All @@ -153,7 +153,7 @@ mod tests {
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(err.to_string().contains("Gzip"));
assert!(err.to_string().contains("gzip"));
assert!(
err.to_string()
.contains("is not supported for Puffin files")
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/puffin/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
offset: 4,
length: 22,
compression_codec: CompressionCodec::Zstd,
compression_codec: CompressionCodec::zstd_default(),
properties: HashMap::new(),
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
offset: 26,
length: 77,
compression_codec: CompressionCodec::Zstd,
compression_codec: CompressionCodec::zstd_default(),
properties: HashMap::new(),
}
}
Expand Down
Loading
Loading