diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..a80547aa01 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -450,6 +450,10 @@ impl RecordBatchTransformer { .iter() .zip(target_schema.fields().iter()) { + // DataType equality (==) is recursive and includes nested field names + // and metadata, so e.g. List(Field("item", Int32)) != List(Field("element", Int32)). + // This correctly routes nested-field-name mismatches to SchemaComparison::Different, + // which triggers generate_transform_operations where they are handled via cast. if source_field.data_type() != target_field.data_type() || source_field.is_nullable() != target_field.is_nullable() { @@ -527,11 +531,17 @@ impl RecordBatchTransformer { // No conflict detection needed - schema resolution happened in reader.rs. let field_by_id = field_id_to_source_schema_map.get(field_id).map( |(source_field, source_index)| { - if source_field.data_type().equals_datatype(target_type) { + if source_field.data_type() == target_type { + // Exact match: data type including nested field names are identical, + // so the column can be used as-is. ColumnSource::PassThrough { source_index: *source_index, } } else { + // Covers both field-name normalization (e.g., Parquet's List inner + // field "item" → Iceberg's "element", or Map's "entries" → + // "key_value") and actual type promotion (e.g., Int32 → Int64). + // Arrow's cast() handles both correctly. ColumnSource::Promote { target_type: target_type.clone(), source_index: *source_index, @@ -1675,4 +1685,366 @@ mod test { assert!(data_col.is_null(1)); assert!(data_col.is_null(2)); } + + // ----------------------------------------------------------------------- + // Helpers for List / Map field-name normalization tests + // ----------------------------------------------------------------------- + + /// Build an Iceberg schema that contains a single `list` column + /// with nullable elements. + fn iceberg_schema_with_list() -> Schema { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 2, + "tags", + Type::List(crate::spec::ListType { + element_field: NestedField::list_element( + 3, + Type::Primitive(PrimitiveType::Int), + false, // optional (nullable) elements + ) + .into(), + }), + ) + .into(), + ]) + .build() + .unwrap() + } + + /// Build a Parquet-style Arrow schema + `RecordBatch` where the list inner + /// field is named `inner_name` (e.g. `"item"` for Parquet, `"element"` for + /// Iceberg). + fn parquet_batch_with_list_field_name( + inner_name: &str, + id_values: Vec, + list_values: Vec, + offsets: Vec, + ) -> RecordBatch { + use arrow_array::ListArray; + use arrow_buffer::OffsetBuffer; + + let inner_field = Arc::new(Field::new(inner_name, DataType::Int32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + )); + let schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + Field::new("tags", DataType::List(inner_field.clone()), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + ])); + + let values = Int32Array::from(list_values); + let offset_buf = OffsetBuffer::from_lengths(offsets); + let list_array = ListArray::new(inner_field, offset_buf, Arc::new(values), None); + + RecordBatch::try_new(schema, vec![ + Arc::new(Int32Array::from(id_values)), + Arc::new(list_array), + ]) + .unwrap() + } + + /// Assert that every column in `batch` has a data type that exactly matches + /// the corresponding schema field (strict `==`, the same check that + /// `RecordBatch::try_new` / `concat_batches` performs). + fn assert_strict_schema_match(batch: &RecordBatch) { + let schema = batch.schema(); + for (i, field) in schema.fields().iter().enumerate() { + assert_eq!( + batch.column(i).data_type(), + field.data_type(), + "column {} data type does not strictly match schema field '{}'", + i, + field.name(), + ); + } + } + + // ----------------------------------------------------------------------- + // Tests + // ----------------------------------------------------------------------- + + /// Parquet files store list inner fields as `"item"` while Iceberg uses + /// `"element"`. The transformer must cast the column so that the output + /// data type exactly matches the target schema, including nested field + /// names. Without this, downstream consumers that use strict schema + /// validation (e.g. DataFusion's `concat_batches` → `RecordBatch::try_new`) + /// fail with "column types must match schema types". + #[test] + fn list_field_name_normalized_from_item_to_element() { + let snapshot_schema = Arc::new(iceberg_schema_with_list()); + let projected = [1, 2]; + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build(); + + let file_batch = + parquet_batch_with_list_field_name("item", vec![1, 2], vec![10, 20, 30], vec![2, 1]); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 2); + assert_strict_schema_match(&result); + + // Schema-level check + let list_dt = result.schema().field(1).data_type().clone(); + match &list_dt { + DataType::List(f) => assert_eq!(f.name(), "element"), + other => panic!("expected List, got {other:?}"), + } + + // Column-level check (the actual array, not just the schema wrapper) + match result.column(1).data_type() { + DataType::List(f) => assert_eq!(f.name(), "element"), + other => panic!("expected List, got {other:?}"), + } + + // Must survive strict RecordBatch::try_new (same validation concat_batches uses) + RecordBatch::try_new(result.schema(), result.columns().to_vec()) + .expect("result batch must pass strict schema validation"); + } + + /// When the Parquet file already uses `"element"` (matching Iceberg), the + /// transformer must take the `PassThrough` path — no cast needed. We verify + /// by checking the output is identical to the input. + #[test] + fn list_field_name_already_matching_uses_passthrough() { + let snapshot_schema = Arc::new(iceberg_schema_with_list()); + let projected = [1, 2]; + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build(); + + let file_batch = + parquet_batch_with_list_field_name("element", vec![1, 2], vec![10, 20, 30], vec![2, 1]); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 2); + assert_strict_schema_match(&result); + + // Data must be identical — no unnecessary cast + assert_eq!( + result + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[1, 2], + ); + let list_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + // first row: [10,20], second row: [30] + assert_eq!(list_col.value(0).len(), 2); + assert_eq!(list_col.value(1).len(), 1); + } + + /// Verify that list data values survive the cast-based normalization + /// intact, including nulls. + #[test] + fn list_field_name_normalization_preserves_data() { + use arrow_array::ListArray; + use arrow_buffer::OffsetBuffer; + + let snapshot_schema = Arc::new(iceberg_schema_with_list()); + let projected = [1, 2]; + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build(); + + let inner_field = Arc::new(Field::new("item", DataType::Int32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + )); + let schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + Field::new("tags", DataType::List(inner_field.clone()), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + ])); + + // Row 0: [10, null, 30] Row 1: [] Row 2: null (whole list null) + let values = Int32Array::from(vec![Some(10), None, Some(30)]); + let offsets = OffsetBuffer::from_lengths([3, 0, 0]); + let nulls = arrow_buffer::NullBuffer::from(vec![true, true, false]); + let list_array = ListArray::new(inner_field, offsets, Arc::new(values), Some(nulls)); + + let file_batch = RecordBatch::try_new(schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(list_array), + ]) + .unwrap(); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_rows(), 3); + assert_strict_schema_match(&result); + + let list_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Row 0: [10, null, 30] + assert!(!list_col.is_null(0)); + let row0 = list_col.value(0); + let row0_ints = row0.as_any().downcast_ref::().unwrap(); + assert_eq!(row0_ints.len(), 3); + assert_eq!(row0_ints.value(0), 10); + assert!(row0_ints.is_null(1)); + assert_eq!(row0_ints.value(2), 30); + + // Row 1: [] + assert!(!list_col.is_null(1)); + assert_eq!(list_col.value(1).len(), 0); + + // Row 2: null + assert!(list_col.is_null(2)); + } + + /// Processing multiple batches through the same transformer must work. + /// The `BatchTransform` is lazily initialized on the first batch, so + /// subsequent batches reuse the same transform. Both must produce + /// normalized output. + #[test] + fn list_field_name_normalization_multiple_batches() { + let snapshot_schema = Arc::new(iceberg_schema_with_list()); + let projected = [1, 2]; + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build(); + + let batch1 = + parquet_batch_with_list_field_name("item", vec![1, 2], vec![10, 20, 30], vec![2, 1]); + let batch2 = parquet_batch_with_list_field_name("item", vec![3], vec![40, 50], vec![2]); + + let result1 = transformer.process_record_batch(batch1).unwrap(); + let result2 = transformer.process_record_batch(batch2).unwrap(); + + for (i, result) in [&result1, &result2].iter().enumerate() { + assert_strict_schema_match(result); + match result.column(1).data_type() { + DataType::List(f) => assert_eq!( + f.name(), + "element", + "batch {i}: inner field should be 'element'" + ), + other => panic!("batch {i}: expected List, got {other:?}"), + } + } + + // Verify concat_batches would work (it uses RecordBatch::try_new internally) + let schema = result1.schema(); + let concat = arrow_select::concat::concat_batches(&schema, [&result1, &result2]); + assert!( + concat.is_ok(), + "concat_batches must succeed after normalization: {:?}", + concat.err() + ); + assert_eq!(concat.unwrap().num_rows(), 3); + } + + /// Map type: Parquet may store the map entries struct field with a + /// different name than Iceberg's `"key_value"`. Verify that the + /// transformer normalizes the struct field names inside the map. + #[test] + fn map_field_name_normalized() { + use arrow_array::MapArray; + use arrow_buffer::OffsetBuffer; + + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 2, + "props", + Type::Map(crate::spec::MapType::new( + NestedField::map_key_element(3, Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::map_value_element( + 4, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(), + ); + let projected = [1, 2]; + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected).build(); + + // Parquet uses "entries" for the struct field and "key"/"value" for children. + // Iceberg's schema_to_arrow_schema uses "key_value" and "key"/"value". + let key_field = Arc::new(Field::new("key", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + )); + let value_field = Arc::new(Field::new("value", DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + )); + // Parquet standard uses "entries"; Iceberg uses "key_value" + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(vec![key_field.clone(), value_field.clone()].into()), + false, + )); + let file_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + Field::new("props", DataType::Map(entries_field.clone(), false), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + ])); + + // Build a MapArray directly: row 0 → {"a":"1"}, row 1 → {} (empty) + let keys = StringArray::from(vec!["a"]); + let values = StringArray::from(vec![Some("1")]); + let struct_array = arrow_array::StructArray::new( + vec![key_field.clone(), value_field.clone()].into(), + vec![Arc::new(keys) as _, Arc::new(values) as _], + None, + ); + let offsets = OffsetBuffer::from_lengths([1, 0]); // row0: 1 entry, row1: 0 entries + let map_array = MapArray::new(entries_field, offsets, struct_array, None, false); + + let file_batch = RecordBatch::try_new(file_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(map_array), + ]) + .unwrap(); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 2); + assert_strict_schema_match(&result); + + // Verify the map struct field is normalized to "key_value" + match result.column(1).data_type() { + DataType::Map(entries, _) => { + assert_eq!( + entries.name(), + "key_value", + "Map entries field should be 'key_value', got '{}'", + entries.name(), + ); + } + other => panic!("expected Map, got {other:?}"), + } + + // Must survive strict RecordBatch::try_new + RecordBatch::try_new(result.schema(), result.columns().to_vec()) + .expect("result batch must pass strict schema validation"); + } }