diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index a87c9302736..ee3b149d9a4 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -14,6 +14,9 @@ _BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY = ( b"lance-encoding:blob-dedicated-size-threshold" ) +_BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY = ( + b"lance-encoding:blob-pack-file-size-threshold" +) _MAX_RUST_USIZE = ctypes.c_size_t(-1).value @@ -217,6 +220,7 @@ def blob_field( nullable: bool = True, inline_size_threshold: Optional[int] = None, dedicated_size_threshold: Optional[int] = None, + pack_file_size_threshold: Optional[int] = None, ) -> pa.Field: """ Construct an Arrow field for a Lance blob column. @@ -234,14 +238,24 @@ def blob_field( Maximum payload size in bytes to store in packed blob storage before using dedicated blob storage. This threshold is checked before ``inline_size_threshold``. + pack_file_size_threshold : optional, int + Maximum size in bytes of a single packed blob sidecar (``.pack``) file. + Once a sidecar reaches this size a new one is started. """ _validate_threshold("inline_size_threshold", inline_size_threshold, allow_zero=True) _validate_threshold( "dedicated_size_threshold", dedicated_size_threshold, allow_zero=False ) + _validate_threshold( + "pack_file_size_threshold", pack_file_size_threshold, allow_zero=False + ) field = pa.field(name, BlobType(), nullable=nullable) - if inline_size_threshold is None and dedicated_size_threshold is None: + if ( + inline_size_threshold is None + and dedicated_size_threshold is None + and pack_file_size_threshold is None + ): return field metadata = dict(field.metadata or {}) @@ -253,6 +267,10 @@ def blob_field( metadata[_BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY] = str( dedicated_size_threshold ).encode() + if pack_file_size_threshold is not None: + metadata[_BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY] = str( + pack_file_size_threshold + ).encode() return field.with_metadata(metadata) diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index fc879c9cbaa..89f2d2197ea 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -7,6 +7,7 @@ import sys import tarfile import textwrap +from pathlib import Path import lance import pandas as pd @@ -588,10 +589,14 @@ def test_blob_field_threshold_metadata(): "blob", inline_size_threshold=16 * 1024, dedicated_size_threshold=2 * 1024 * 1024, + pack_file_size_threshold=512 * 1024 * 1024, ) assert field.metadata[b"lance-encoding:blob-inline-size-threshold"] == b"16384" assert field.metadata[b"lance-encoding:blob-dedicated-size-threshold"] == b"2097152" + assert ( + field.metadata[b"lance-encoding:blob-pack-file-size-threshold"] == b"536870912" + ) @pytest.mark.parametrize( @@ -645,6 +650,30 @@ def test_blob_field_threshold_metadata(): "dedicated_size_threshold must fit in a Rust usize", id="overflow_dedicated", ), + pytest.param( + {"pack_file_size_threshold": 0}, + ValueError, + "pack_file_size_threshold must be positive", + id="zero_pack_file", + ), + pytest.param( + {"pack_file_size_threshold": -1}, + ValueError, + "pack_file_size_threshold must be positive", + id="negative_pack_file", + ), + pytest.param( + {"pack_file_size_threshold": True}, + TypeError, + "pack_file_size_threshold must be an int", + id="bool_pack_file", + ), + pytest.param( + {"pack_file_size_threshold": 2**100}, + OverflowError, + "pack_file_size_threshold must fit in a Rust usize", + id="overflow_pack_file", + ), ], ) def test_blob_field_rejects_invalid_thresholds(kwargs, error, message): @@ -715,6 +744,50 @@ def test_blob_extension_append_rejects_explicit_threshold_mismatch(tmp_path): lance.write_dataset(append, dataset_path, mode="append") +def test_blob_extension_pack_file_threshold_metadata_persists_after_reopen( + tmp_path: Path, +): + dataset_path = tmp_path / "test_ds_v2_pack_file_threshold_persists" + threshold = 512 * 1024 * 1024 + schema = pa.schema([lance.blob_field("blob", pack_file_size_threshold=threshold)]) + table = pa.table({"blob": lance.blob_array([b"x"])}, schema=schema) + + lance.write_dataset(table, dataset_path, data_storage_version="2.2") + reopened = lance.dataset(dataset_path) + + assert ( + reopened.schema.field("blob").metadata[ + b"lance-encoding:blob-pack-file-size-threshold" + ] + == str(threshold).encode() + ) + + +def test_blob_extension_append_rejects_pack_file_threshold_mismatch(tmp_path: Path): + dataset_path = tmp_path / "test_ds_v2_append_pack_file_mismatch" + initial_schema = pa.schema( + [lance.blob_field("blob", pack_file_size_threshold=512 * 1024 * 1024)] + ) + initial = pa.table( + {"blob": lance.blob_array([b"x" * 2048])}, + schema=initial_schema, + ) + lance.write_dataset(initial, dataset_path, data_storage_version="2.2") + + append_schema = pa.schema( + [lance.blob_field("blob", pack_file_size_threshold=256 * 1024 * 1024)] + ) + append = pa.table( + {"blob": lance.blob_array([b"x" * 2048])}, + schema=append_schema, + ) + + with pytest.raises( + OSError, match="Cannot append data with blob threshold metadata" + ): + lance.write_dataset(append, dataset_path, mode="append") + + def test_blob_extension_dedicated_threshold_precedes_inline_threshold(tmp_path): payload = b"x" * 2048 schema = pa.schema( diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 34a67600543..46d5a855f1f 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -54,6 +54,9 @@ pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str = "lance-encoding:blob-dedicated-size-threshold"; /// Metadata key for overriding the inline blob size threshold (in bytes) pub const BLOB_INLINE_SIZE_THRESHOLD_META_KEY: &str = "lance-encoding:blob-inline-size-threshold"; +/// Metadata key for overriding the maximum size (in bytes) of a packed blob sidecar file +pub const BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY: &str = + "lance-encoding:blob-pack-file-size-threshold"; type Result = std::result::Result; diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 8cdde543e4e..5c976801f63 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -21,8 +21,8 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{FutureExt, StreamExt, TryStreamExt, stream}; use lance_arrow::{ - BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, BLOB_INLINE_SIZE_THRESHOLD_META_KEY, FieldExt, - r#struct::StructArrayExt, + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, BLOB_INLINE_SIZE_THRESHOLD_META_KEY, + BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, FieldExt, r#struct::StructArrayExt, }; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_io::scheduler::{FileScheduler, ScanScheduler, SchedulerConfig}; @@ -71,6 +71,19 @@ pub(super) fn blob_dedicated_threshold_from_metadata( ) } +pub(super) fn blob_pack_file_threshold_from_metadata( + metadata: &HashMap, + field_name: &str, +) -> Result { + blob_threshold_from_metadata( + metadata, + field_name, + BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, + PACK_FILE_MAX_SIZE, + false, + ) +} + fn blob_threshold_from_metadata( metadata: &HashMap, field_name: &str, @@ -178,10 +191,10 @@ struct PackWriter { object_store: ObjectStore, data_dir: Path, data_file_key: String, - max_pack_size: usize, current_blob_id: Option, writer: Option>, current_size: usize, + current_max_pack_size: Option, } impl PackWriter { @@ -190,24 +203,33 @@ impl PackWriter { object_store, data_dir, data_file_key, - max_pack_size: PACK_FILE_MAX_SIZE, current_blob_id: None, writer: None, current_size: 0, + current_max_pack_size: None, } } - async fn start_new_pack(&mut self, blob_id: u32) -> Result<()> { + async fn start_new_pack(&mut self, blob_id: u32, max_pack_size: usize) -> Result<()> { let path = blob_path(&self.data_dir, &self.data_file_key, blob_id); let writer = self.object_store.create(&path).await?; self.writer = Some(writer); self.current_blob_id = Some(blob_id); self.current_size = 0; + self.current_max_pack_size = Some(max_pack_size); Ok(()) } /// Append `data` to the current `.blob` file, rolling to a new file when - /// `max_pack_size` would be exceeded. + /// `max_pack_size` would be exceeded or when it differs from the threshold the + /// current file was opened under. + /// + /// max_pack_size: the maximum size of a pack file, already resolved by the + /// caller (write-param override or the blob field's metadata threshold). Fields + /// with different thresholds share this rolling writer, so each pack file is + /// kept to a single threshold — a payload whose `max_pack_size` differs from the + /// open file's threshold rolls a new file rather than extending the existing one + /// past its (possibly smaller) bound. /// /// alloc_blob_id: called only when a new pack file is opened; returns the /// blob_id used as the file name. @@ -216,6 +238,7 @@ impl PackWriter { /// position is the start offset of this payload in that pack file. async fn write_with_allocator( &mut self, + max_pack_size: usize, alloc_blob_id: &mut F, source: BlobWriteSource<'_>, ) -> Result<(u32, u64)> @@ -223,14 +246,19 @@ impl PackWriter { F: FnMut() -> u32, { let len = source.size(); - if self - .current_blob_id - .map(|_| self.current_size + len > self.max_pack_size) - .unwrap_or(true) - { + // Roll to a new file when the current one was opened under a different + // threshold, or when appending this payload would exceed it. + let needs_new_pack = match self.current_max_pack_size { + Some(current_max_pack_size) => { + current_max_pack_size != max_pack_size + || self.current_size + len > current_max_pack_size + } + None => true, + }; + if needs_new_pack { let blob_id = alloc_blob_id(); self.finish().await?; - self.start_new_pack(blob_id).await?; + self.start_new_pack(blob_id, max_pack_size).await?; } let writer = self.writer.as_mut().expect("pack writer is initialized"); @@ -246,6 +274,7 @@ impl PackWriter { } self.current_blob_id = None; self.current_size = 0; + self.current_max_pack_size = None; Ok(()) } } @@ -261,6 +290,10 @@ pub struct BlobPreprocessor { data_file_key: String, local_counter: u32, pack_writer: PackWriter, + /// Write-param override for the pack-file roll size. When set, it takes + /// precedence over each field's `blob-pack-file-size-threshold` metadata for + /// this write job only; it is not persisted into the dataset schema. + pack_file_size_override: Option, field_processors: Vec, external_base_resolver: Option>, allow_external_blob_outside_bases: bool, @@ -296,6 +329,7 @@ enum BlobPreprocessFieldKind { BlobV2 { inline_threshold: usize, dedicated_threshold: usize, + pack_file_threshold: usize, writer_metadata: HashMap, }, Struct { @@ -317,6 +351,10 @@ impl BlobPreprocessField { field.metadata(), field.name(), )?, + pack_file_threshold: blob_pack_file_threshold_from_metadata( + field.metadata(), + field.name(), + )?, writer_metadata: field.metadata().clone(), }, }); @@ -424,16 +462,13 @@ impl BlobPreprocessor { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, - pack_file_size_threshold: Option, + pack_file_size_override: Option, ) -> Result { - let mut pack_writer = PackWriter::new( + let pack_writer = PackWriter::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), ); - if let Some(max_bytes) = pack_file_size_threshold { - pack_writer.max_pack_size = max_bytes; - } let arrow_schema = arrow_schema::Schema::from(schema); let field_processors = arrow_schema .fields() @@ -447,6 +482,7 @@ impl BlobPreprocessor { // Start at 1 to avoid a potential all-zero blob_id value. local_counter: 1, pack_writer, + pack_file_size_override, field_processors, external_base_resolver, allow_external_blob_outside_bases, @@ -470,10 +506,20 @@ impl BlobPreprocessor { Ok(path) } - async fn write_packed(&mut self, source: BlobWriteSource<'_>) -> Result<(u32, u64)> { + async fn write_packed( + &mut self, + field_pack_file_threshold: usize, + source: BlobWriteSource<'_>, + ) -> Result<(u32, u64)> { + // The write-param override, when present, wins over the field's metadata + // threshold for this write job (write-param > field metadata > default). + let max_pack_size = self + .pack_file_size_override + .unwrap_or(field_pack_file_threshold); let (counter, pack_writer) = (&mut self.local_counter, &mut self.pack_writer); pack_writer .write_with_allocator( + max_pack_size, &mut || { let id = *counter; *counter += 1; @@ -600,6 +646,7 @@ impl BlobPreprocessor { BlobPreprocessFieldKind::BlobV2 { inline_threshold, dedicated_threshold, + pack_file_threshold, writer_metadata, } => { self.preprocess_blob_array( @@ -607,6 +654,7 @@ impl BlobPreprocessor { field.as_ref(), *inline_threshold, *dedicated_threshold, + *pack_file_threshold, writer_metadata, ) .await @@ -678,6 +726,7 @@ impl BlobPreprocessor { field: &ArrowField, inline_threshold: usize, dedicated_threshold: usize, + pack_file_threshold: usize, writer_metadata: &HashMap, ) -> Result<(ArrayRef, Arc)> { let struct_arr = array @@ -751,7 +800,10 @@ impl BlobPreprocessor { if has_data && data_len > inline_threshold { let (pack_blob_id, position) = self - .write_packed(BlobWriteSource::Bytes(data_col.value(i))) + .write_packed( + pack_file_threshold, + BlobWriteSource::Bytes(data_col.value(i)), + ) .await?; kind_builder.append_value(BlobKind::Packed as u8); @@ -800,7 +852,7 @@ impl BlobPreprocessor { if data_len > inline_threshold as u64 { let (pack_blob_id, position) = self - .write_packed(BlobWriteSource::External(&source)) + .write_packed(pack_file_threshold, BlobWriteSource::External(&source)) .await?; kind_builder.append_value(BlobKind::Packed as u8); @@ -2302,7 +2354,8 @@ mod tests { use futures::{StreamExt, TryStreamExt, future::try_join_all}; use lance_arrow::{ ARROW_EXT_NAME_KEY, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, - BLOB_INLINE_SIZE_THRESHOLD_META_KEY, BLOB_V2_EXT_NAME, DataTypeExt, + BLOB_INLINE_SIZE_THRESHOLD_META_KEY, BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, + BLOB_V2_EXT_NAME, DataTypeExt, }; use lance_core::datatypes::BlobKind; use lance_io::object_store::{ @@ -4082,10 +4135,12 @@ mod tests { ); } - async fn try_preprocess_kind_with_blob_metadata( + async fn try_preprocess_blobs_with_blob_metadata( metadata_entries: Vec<(&'static str, String)>, - data_len: usize, - ) -> Result { + pack_file_size_override: Option, + blob_len: usize, + num_blobs: usize, + ) -> Result { let (object_store, base_path) = ObjectStore::from_uri_and_params( Arc::new(ObjectStoreRegistry::default()), "memory://blob_preprocessor", @@ -4116,11 +4171,13 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), - None, + pack_file_size_override, )?; - let mut blob_builder = BlobArrayBuilder::new(1); - blob_builder.push_bytes(vec![0u8; data_len]).unwrap(); + let mut blob_builder = BlobArrayBuilder::new(num_blobs); + for _ in 0..num_blobs { + blob_builder.push_bytes(vec![0u8; blob_len]).unwrap(); + } let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); let field_without_metadata = @@ -4129,11 +4186,20 @@ mod tests { let batch = RecordBatch::try_new(batch_schema, vec![blob_array]).unwrap(); let out = preprocessor.preprocess_batch(&batch).await?; - let struct_arr = out + Ok(out .column(0) .as_any() .downcast_ref::() - .unwrap(); + .unwrap() + .clone()) + } + + async fn try_preprocess_kind_with_blob_metadata( + metadata_entries: Vec<(&'static str, String)>, + data_len: usize, + ) -> Result { + let struct_arr = + try_preprocess_blobs_with_blob_metadata(metadata_entries, None, data_len, 1).await?; Ok(struct_arr .column_by_name("kind") .unwrap() @@ -4150,6 +4216,27 @@ mod tests { .unwrap() } + async fn packed_blobs_with_blob_metadata( + metadata_entries: Vec<(&'static str, String)>, + pack_file_size_override: Option, + blob_len: usize, + num_blobs: usize, + ) -> Vec { + let struct_arr = try_preprocess_blobs_with_blob_metadata( + metadata_entries, + pack_file_size_override, + blob_len, + num_blobs, + ) + .await + .unwrap(); + let blob_ids = struct_arr + .column_by_name("blob_id") + .unwrap() + .as_primitive::(); + (0..struct_arr.len()).map(|i| blob_ids.value(i)).collect() + } + #[tokio::test] async fn test_blob_v2_dedicated_threshold_rejects_non_positive_metadata() { let err = try_preprocess_kind_with_blob_metadata( @@ -4240,6 +4327,61 @@ mod tests { assert_eq!(kind, lance_core::datatypes::BlobKind::Packed as u8); } + #[tokio::test] + async fn test_blob_v2_pack_file_threshold_rolls_at_metadata_value() { + // Blobs must exceed the inline cutoff to be packed at all. With a pack-file + // threshold equal to a single blob's size, each of the three blobs rolls to its + // own pack file (a distinct blob_id). + let blob_len = super::INLINE_MAX + 1024; + let blob_ids = packed_blobs_with_blob_metadata( + vec![(BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, blob_len.to_string())], + None, + blob_len, + 3, + ) + .await; + let distinct: std::collections::HashSet = blob_ids.iter().copied().collect(); + assert_eq!( + distinct.len(), + 3, + "expected one pack file per blob: {blob_ids:?}" + ); + } + + #[tokio::test] + async fn test_blob_v2_pack_file_threshold_packs_within_metadata_value() { + // A pack-file threshold large enough for all three blobs keeps them in a single + // pack file (one shared blob_id). + let blob_len = super::INLINE_MAX + 1024; + let blob_ids = packed_blobs_with_blob_metadata( + vec![( + BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, + (blob_len * 3).to_string(), + )], + None, + blob_len, + 3, + ) + .await; + let distinct: std::collections::HashSet = blob_ids.iter().copied().collect(); + assert_eq!( + distinct.len(), + 1, + "expected a single shared pack file: {blob_ids:?}" + ); + } + + #[tokio::test] + async fn test_blob_v2_pack_file_threshold_rejects_non_positive_metadata() { + let err = try_preprocess_kind_with_blob_metadata( + vec![(BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, "0".to_string())], + 1024, + ) + .await + .expect_err("zero pack-file threshold should be rejected"); + assert!(err.to_string().contains("expected a positive integer")); + } + #[tokio::test] async fn test_blob_v2_inline_threshold_respects_smaller_metadata() { let kind = preprocess_kind_with_blob_metadata( @@ -4566,4 +4708,25 @@ mod tests { .unwrap(); assert_eq!(dataset.count_rows(None).await.unwrap(), 2); } + + #[tokio::test] + async fn test_blob_v2_pack_file_threshold_write_param_overrides_metadata() { + let blob_len = super::INLINE_MAX + 1024; + let blob_ids = packed_blobs_with_blob_metadata( + vec![( + BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, + (blob_len * 3).to_string(), + )], + Some(blob_len), + blob_len, + 3, + ) + .await; + let distinct: std::collections::HashSet = blob_ids.iter().copied().collect(); + assert_eq!( + distinct.len(), + 3, + "write-param override should force one pack file per blob: {blob_ids:?}" + ); + } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index ff0a119158c..1881a0c627e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -8,7 +8,8 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::{Stream, StreamExt, TryStreamExt}; use lance_arrow::{ ARROW_EXT_NAME_KEY, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, - BLOB_INLINE_SIZE_THRESHOLD_META_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME, + BLOB_INLINE_SIZE_THRESHOLD_META_KEY, BLOB_META_KEY, BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, + BLOB_V2_EXT_NAME, }; use lance_core::datatypes::{ NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, @@ -40,7 +41,7 @@ use crate::Dataset; use crate::dataset::blob::{ BlobPreprocessor, ExternalBaseCandidate, ExternalBaseResolver, blob_dedicated_threshold_from_metadata, blob_inline_threshold_from_metadata, - preprocess_blob_batches, + blob_pack_file_threshold_from_metadata, preprocess_blob_batches, }; use crate::session::Session; @@ -183,66 +184,63 @@ fn validate_blob_threshold_metadata_for_append( let Some(dataset_field) = dataset_schema.field(&input_field.name) else { continue; }; - let input_is_blob_v2 = input_field - .metadata - .get(ARROW_EXT_NAME_KEY) - .is_some_and(|extension_name| extension_name == BLOB_V2_EXT_NAME); - let dataset_is_blob_v2 = dataset_field - .metadata - .get(ARROW_EXT_NAME_KEY) - .is_some_and(|extension_name| extension_name == BLOB_V2_EXT_NAME); - if !input_is_blob_v2 && !dataset_is_blob_v2 { - continue; - } + validate_blob_threshold_metadata_for_field_recursive(input_field, dataset_field)?; + } - let has_inline_threshold = input_field - .metadata - .contains_key(BLOB_INLINE_SIZE_THRESHOLD_META_KEY); - let has_dedicated_threshold = input_field - .metadata - .contains_key(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY); - if !has_inline_threshold && !has_dedicated_threshold { - continue; - } + Ok(()) +} - if has_inline_threshold { - let input_inline_threshold = - blob_inline_threshold_from_metadata(&input_field.metadata, &input_field.name)?; - let dataset_inline_threshold = - blob_inline_threshold_from_metadata(&dataset_field.metadata, &dataset_field.name)?; - if input_inline_threshold != dataset_inline_threshold { - return Err(Error::invalid_input(format!( - "Cannot append data with blob threshold metadata {}={} for field '{}'; \ - the dataset schema has effective value {}. Blob thresholds for existing \ - columns are stored in the dataset schema.", - BLOB_INLINE_SIZE_THRESHOLD_META_KEY, - input_inline_threshold, - input_field.name, - dataset_inline_threshold, - ))); +fn validate_blob_threshold_metadata_for_field_recursive( + input_field: &lance_core::datatypes::Field, + dataset_field: &lance_core::datatypes::Field, +) -> Result<()> { + let input_is_blob_v2 = input_field + .metadata + .get(ARROW_EXT_NAME_KEY) + .is_some_and(|extension_name| extension_name == BLOB_V2_EXT_NAME); + let dataset_is_blob_v2 = dataset_field + .metadata + .get(ARROW_EXT_NAME_KEY) + .is_some_and(|extension_name| extension_name == BLOB_V2_EXT_NAME); + if input_is_blob_v2 || dataset_is_blob_v2 { + for (key, read_threshold) in [ + ( + BLOB_INLINE_SIZE_THRESHOLD_META_KEY, + blob_inline_threshold_from_metadata + as fn(&HashMap, &str) -> Result, + ), + ( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, + blob_dedicated_threshold_from_metadata, + ), + ( + BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY, + blob_pack_file_threshold_from_metadata, + ), + ] { + if !input_field.metadata.contains_key(key) { + continue; } - } - if has_dedicated_threshold { - let input_dedicated_threshold = - blob_dedicated_threshold_from_metadata(&input_field.metadata, &input_field.name)?; - let dataset_dedicated_threshold = blob_dedicated_threshold_from_metadata( - &dataset_field.metadata, - &dataset_field.name, - )?; - if input_dedicated_threshold != dataset_dedicated_threshold { + let input_value = read_threshold(&input_field.metadata, &input_field.name)?; + let dataset_value = read_threshold(&dataset_field.metadata, &dataset_field.name)?; + if input_value != dataset_value { return Err(Error::invalid_input(format!( - "Cannot append data with blob threshold metadata {}={} for field '{}'; \ - the dataset schema has effective value {}. Blob thresholds for existing \ - columns are stored in the dataset schema.", - BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, - input_dedicated_threshold, + "Cannot append data with blob threshold metadata {key}={input_value} for \ + field '{}'; the dataset schema has effective value {dataset_value}. Blob \ + thresholds for existing columns are stored in the dataset schema.", input_field.name, - dataset_dedicated_threshold, ))); } } } + for input_child in &input_field.children { + let Some(dataset_child) = dataset_field.child(&input_child.name) else { + continue; + }; + validate_blob_threshold_metadata_for_field_recursive(input_child, dataset_child)?; + } + Ok(()) }