Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 373 additions & 1 deletion crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ impl RecordBatchTransformer {
.iter()
.zip(target_schema.fields().iter())
{
// DataType equality (==) is recursive and includes nested field names
// and metadata, so e.g. List(Field("item", Int32)) != List(Field("element", Int32)).
// This correctly routes nested-field-name mismatches to SchemaComparison::Different,
// which triggers generate_transform_operations where they are handled via cast.
if source_field.data_type() != target_field.data_type()
|| source_field.is_nullable() != target_field.is_nullable()
{
Expand Down Expand Up @@ -527,11 +531,17 @@ impl RecordBatchTransformer {
// No conflict detection needed - schema resolution happened in reader.rs.
let field_by_id = field_id_to_source_schema_map.get(field_id).map(
|(source_field, source_index)| {
if source_field.data_type().equals_datatype(target_type) {
if source_field.data_type() == target_type {
// Exact match: data type including nested field names are identical,
// so the column can be used as-is.
ColumnSource::PassThrough {
source_index: *source_index,
}
} else {
// Covers both field-name normalization (e.g., Parquet's List inner
// field "item" → Iceberg's "element", or Map's "entries" →
// "key_value") and actual type promotion (e.g., Int32 → Int64).
// Arrow's cast() handles both correctly.
ColumnSource::Promote {
target_type: target_type.clone(),
source_index: *source_index,
Expand Down Expand Up @@ -1675,4 +1685,366 @@ mod test {
assert!(data_col.is_null(1));
assert!(data_col.is_null(2));
}

// -----------------------------------------------------------------------
// Helpers for List / Map field-name normalization tests
// -----------------------------------------------------------------------

/// Build an Iceberg schema that contains a single `list<int>` column
/// with nullable elements.
fn iceberg_schema_with_list() -> Schema {
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
2,
"tags",
Type::List(crate::spec::ListType {
element_field: NestedField::list_element(
3,
Type::Primitive(PrimitiveType::Int),
false, // optional (nullable) elements
)
.into(),
}),
)
.into(),
])
.build()
.unwrap()
}

/// Build a Parquet-style Arrow schema + `RecordBatch` where the list inner
/// field is named `inner_name` (e.g. `"item"` for Parquet, `"element"` for
/// Iceberg).
fn parquet_batch_with_list_field_name(
inner_name: &str,
id_values: Vec<i32>,
list_values: Vec<i32>,
offsets: Vec<usize>,
) -> RecordBatch {
use arrow_array::ListArray;
use arrow_buffer::OffsetBuffer;

let inner_field = Arc::new(Field::new(inner_name, DataType::Int32, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
));
let schema = Arc::new(ArrowSchema::new(vec![
simple_field("id", DataType::Int32, false, "1"),
Field::new("tags", DataType::List(inner_field.clone()), true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
]));

let values = Int32Array::from(list_values);
let offset_buf = OffsetBuffer::from_lengths(offsets);
let list_array = ListArray::new(inner_field, offset_buf, Arc::new(values), None);

RecordBatch::try_new(schema, vec![
Arc::new(Int32Array::from(id_values)),
Arc::new(list_array),
])
.unwrap()
}

/// Assert that every column in `batch` has a data type that exactly matches
/// the corresponding schema field (strict `==`, the same check that
/// `RecordBatch::try_new` / `concat_batches` performs).
fn assert_strict_schema_match(batch: &RecordBatch) {
let schema = batch.schema();
for (i, field) in schema.fields().iter().enumerate() {
assert_eq!(
batch.column(i).data_type(),
field.data_type(),
"column {} data type does not strictly match schema field '{}'",
i,
field.name(),
);
}
}

// -----------------------------------------------------------------------
// Tests
// -----------------------------------------------------------------------

/// Parquet files store list inner fields as `"item"` while Iceberg uses
/// `"element"`. The transformer must cast the column so that the output
/// data type exactly matches the target schema, including nested field
/// names. Without this, downstream consumers that use strict schema
/// validation (e.g. DataFusion's `concat_batches` → `RecordBatch::try_new`)
/// fail with "column types must match schema types".
#[test]
fn list_field_name_normalized_from_item_to_element() {
let snapshot_schema = Arc::new(iceberg_schema_with_list());
let projected = [1, 2];
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build();

let file_batch =
parquet_batch_with_list_field_name("item", vec![1, 2], vec![10, 20, 30], vec![2, 1]);

let result = transformer.process_record_batch(file_batch).unwrap();

assert_eq!(result.num_columns(), 2);
assert_eq!(result.num_rows(), 2);
assert_strict_schema_match(&result);

// Schema-level check
let list_dt = result.schema().field(1).data_type().clone();
match &list_dt {
DataType::List(f) => assert_eq!(f.name(), "element"),
other => panic!("expected List, got {other:?}"),
}

// Column-level check (the actual array, not just the schema wrapper)
match result.column(1).data_type() {
DataType::List(f) => assert_eq!(f.name(), "element"),
other => panic!("expected List, got {other:?}"),
}

// Must survive strict RecordBatch::try_new (same validation concat_batches uses)
RecordBatch::try_new(result.schema(), result.columns().to_vec())
.expect("result batch must pass strict schema validation");
}

/// When the Parquet file already uses `"element"` (matching Iceberg), the
/// transformer must take the `PassThrough` path — no cast needed. We verify
/// by checking the output is identical to the input.
#[test]
fn list_field_name_already_matching_uses_passthrough() {
let snapshot_schema = Arc::new(iceberg_schema_with_list());
let projected = [1, 2];
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build();

let file_batch =
parquet_batch_with_list_field_name("element", vec![1, 2], vec![10, 20, 30], vec![2, 1]);

let result = transformer.process_record_batch(file_batch).unwrap();

assert_eq!(result.num_columns(), 2);
assert_eq!(result.num_rows(), 2);
assert_strict_schema_match(&result);

// Data must be identical — no unnecessary cast
assert_eq!(
result
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[1, 2],
);
let list_col = result
.column(1)
.as_any()
.downcast_ref::<arrow_array::ListArray>()
.unwrap();
// first row: [10,20], second row: [30]
assert_eq!(list_col.value(0).len(), 2);
assert_eq!(list_col.value(1).len(), 1);
}

/// Verify that list data values survive the cast-based normalization
/// intact, including nulls.
#[test]
fn list_field_name_normalization_preserves_data() {
use arrow_array::ListArray;
use arrow_buffer::OffsetBuffer;

let snapshot_schema = Arc::new(iceberg_schema_with_list());
let projected = [1, 2];
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build();

let inner_field = Arc::new(Field::new("item", DataType::Int32, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
));
let schema = Arc::new(ArrowSchema::new(vec![
simple_field("id", DataType::Int32, false, "1"),
Field::new("tags", DataType::List(inner_field.clone()), true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
]));

// Row 0: [10, null, 30] Row 1: [] Row 2: null (whole list null)
let values = Int32Array::from(vec![Some(10), None, Some(30)]);
let offsets = OffsetBuffer::from_lengths([3, 0, 0]);
let nulls = arrow_buffer::NullBuffer::from(vec![true, true, false]);
let list_array = ListArray::new(inner_field, offsets, Arc::new(values), Some(nulls));

let file_batch = RecordBatch::try_new(schema, vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(list_array),
])
.unwrap();

let result = transformer.process_record_batch(file_batch).unwrap();

assert_eq!(result.num_rows(), 3);
assert_strict_schema_match(&result);

let list_col = result
.column(1)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();

// Row 0: [10, null, 30]
assert!(!list_col.is_null(0));
let row0 = list_col.value(0);
let row0_ints = row0.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(row0_ints.len(), 3);
assert_eq!(row0_ints.value(0), 10);
assert!(row0_ints.is_null(1));
assert_eq!(row0_ints.value(2), 30);

// Row 1: []
assert!(!list_col.is_null(1));
assert_eq!(list_col.value(1).len(), 0);

// Row 2: null
assert!(list_col.is_null(2));
}

/// Processing multiple batches through the same transformer must work.
/// The `BatchTransform` is lazily initialized on the first batch, so
/// subsequent batches reuse the same transform. Both must produce
/// normalized output.
#[test]
fn list_field_name_normalization_multiple_batches() {
let snapshot_schema = Arc::new(iceberg_schema_with_list());
let projected = [1, 2];
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build();

let batch1 =
parquet_batch_with_list_field_name("item", vec![1, 2], vec![10, 20, 30], vec![2, 1]);
let batch2 = parquet_batch_with_list_field_name("item", vec![3], vec![40, 50], vec![2]);

let result1 = transformer.process_record_batch(batch1).unwrap();
let result2 = transformer.process_record_batch(batch2).unwrap();

for (i, result) in [&result1, &result2].iter().enumerate() {
assert_strict_schema_match(result);
match result.column(1).data_type() {
DataType::List(f) => assert_eq!(
f.name(),
"element",
"batch {i}: inner field should be 'element'"
),
other => panic!("batch {i}: expected List, got {other:?}"),
}
}

// Verify concat_batches would work (it uses RecordBatch::try_new internally)
let schema = result1.schema();
let concat = arrow_select::concat::concat_batches(&schema, [&result1, &result2]);
assert!(
concat.is_ok(),
"concat_batches must succeed after normalization: {:?}",
concat.err()
);
assert_eq!(concat.unwrap().num_rows(), 3);
}

/// Map type: Parquet may store the map entries struct field with a
/// different name than Iceberg's `"key_value"`. Verify that the
/// transformer normalizes the struct field names inside the map.
#[test]
fn map_field_name_normalized() {
use arrow_array::MapArray;
use arrow_buffer::OffsetBuffer;

let snapshot_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
2,
"props",
Type::Map(crate::spec::MapType::new(
NestedField::map_key_element(3, Type::Primitive(PrimitiveType::String))
.into(),
NestedField::map_value_element(
4,
Type::Primitive(PrimitiveType::String),
true,
)
.into(),
)),
)
.into(),
])
.build()
.unwrap(),
);
let projected = [1, 2];
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build();

// Parquet uses "entries" for the struct field and "key"/"value" for children.
// Iceberg's schema_to_arrow_schema uses "key_value" and "key"/"value".
let key_field = Arc::new(Field::new("key", DataType::Utf8, false).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
));
let value_field = Arc::new(Field::new("value", DataType::Utf8, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
));
// Parquet standard uses "entries"; Iceberg uses "key_value"
let entries_field = Arc::new(Field::new(
"entries",
DataType::Struct(vec![key_field.clone(), value_field.clone()].into()),
false,
));
let file_schema = Arc::new(ArrowSchema::new(vec![
simple_field("id", DataType::Int32, false, "1"),
Field::new("props", DataType::Map(entries_field.clone(), false), true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
]));

// Build a MapArray directly: row 0 → {"a":"1"}, row 1 → {} (empty)
let keys = StringArray::from(vec!["a"]);
let values = StringArray::from(vec![Some("1")]);
let struct_array = arrow_array::StructArray::new(
vec![key_field.clone(), value_field.clone()].into(),
vec![Arc::new(keys) as _, Arc::new(values) as _],
None,
);
let offsets = OffsetBuffer::from_lengths([1, 0]); // row0: 1 entry, row1: 0 entries
let map_array = MapArray::new(entries_field, offsets, struct_array, None, false);

let file_batch = RecordBatch::try_new(file_schema, vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(map_array),
])
.unwrap();

let result = transformer.process_record_batch(file_batch).unwrap();

assert_eq!(result.num_columns(), 2);
assert_eq!(result.num_rows(), 2);
assert_strict_schema_match(&result);

// Verify the map struct field is normalized to "key_value"
match result.column(1).data_type() {
DataType::Map(entries, _) => {
assert_eq!(
entries.name(),
"key_value",
"Map entries field should be 'key_value', got '{}'",
entries.name(),
);
}
other => panic!("expected Map, got {other:?}"),
}

// Must survive strict RecordBatch::try_new
RecordBatch::try_new(result.schema(), result.columns().to_vec())
.expect("result batch must pass strict schema validation");
}
}
Loading