diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 4e650e9574e3..05639de0d245 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -144,16 +144,13 @@ public ParquetValueReader list( return null; } - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4e8c9f03f84c..064ff9a0b5de 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -221,6 +221,63 @@ public void testTwoLevelList() throws IOException { } } + @Test + public void testTwoLevelListWithEmptyLists() throws IOException { + Schema schema = + new Schema( + optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())), + optional(2, "label", Types.StringType.get())); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).isTrue(); + + ParquetWriter writer = + AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder rb = new GenericRecordBuilder(avroSchema); + writer.write(rb.set("names", java.util.Arrays.asList("alice")).set("label", "row0").build()); + writer.write(rb.set("names", java.util.Collections.emptyList()).set("label", "row1").build()); + writer.write(rb.set("names", java.util.Arrays.asList("bob")).set("label", "row2").build()); + writer.write( + rb.set("names", java.util.Arrays.asList("carol", "dave")).set("label", "row3").build()); + writer.close(); + + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .build()) { + Iterator rows = reader.iterator(); + + RowData row0 = rows.next(); + assertThat(row0.getArray(0).getString(0).toString()).isEqualTo("alice"); + assertThat(row0.getString(1).toString()).isEqualTo("row0"); + + RowData row1 = rows.next(); + assertThat(row1.isNullAt(0)).isFalse(); + assertThat(row1.getArray(0).size()).isEqualTo(0); + assertThat(row1.getString(1).toString()).isEqualTo("row1"); + + RowData row2 = rows.next(); + assertThat(row2.getArray(0).getString(0).toString()).isEqualTo("bob"); + assertThat(row2.getString(1).toString()).isEqualTo("row2"); + + RowData row3 = rows.next(); + assertThat(row3.getArray(0).size()).isEqualTo(2); + assertThat(row3.getArray(0).getString(0).toString()).isEqualTo("carol"); + assertThat(row3.getArray(0).getString(1).toString()).isEqualTo("dave"); + assertThat(row3.getString(1).toString()).isEqualTo("row3"); + + assertThat(rows).isExhausted(); + } + } + private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 4e650e9574e3..05639de0d245 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -144,16 +144,13 @@ public ParquetValueReader list( return null; } - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4e8c9f03f84c..064ff9a0b5de 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -221,6 +221,63 @@ public void testTwoLevelList() throws IOException { } } + @Test + public void testTwoLevelListWithEmptyLists() throws IOException { + Schema schema = + new Schema( + optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())), + optional(2, "label", Types.StringType.get())); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).isTrue(); + + ParquetWriter writer = + AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder rb = new GenericRecordBuilder(avroSchema); + writer.write(rb.set("names", java.util.Arrays.asList("alice")).set("label", "row0").build()); + writer.write(rb.set("names", java.util.Collections.emptyList()).set("label", "row1").build()); + writer.write(rb.set("names", java.util.Arrays.asList("bob")).set("label", "row2").build()); + writer.write( + rb.set("names", java.util.Arrays.asList("carol", "dave")).set("label", "row3").build()); + writer.close(); + + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .build()) { + Iterator rows = reader.iterator(); + + RowData row0 = rows.next(); + assertThat(row0.getArray(0).getString(0).toString()).isEqualTo("alice"); + assertThat(row0.getString(1).toString()).isEqualTo("row0"); + + RowData row1 = rows.next(); + assertThat(row1.isNullAt(0)).isFalse(); + assertThat(row1.getArray(0).size()).isEqualTo(0); + assertThat(row1.getString(1).toString()).isEqualTo("row1"); + + RowData row2 = rows.next(); + assertThat(row2.getArray(0).getString(0).toString()).isEqualTo("bob"); + assertThat(row2.getString(1).toString()).isEqualTo("row2"); + + RowData row3 = rows.next(); + assertThat(row3.getArray(0).size()).isEqualTo(2); + assertThat(row3.getArray(0).getString(0).toString()).isEqualTo("carol"); + assertThat(row3.getArray(0).getString(1).toString()).isEqualTo("dave"); + assertThat(row3.getString(1).toString()).isEqualTo("row3"); + + assertThat(rows).isExhausted(); + } + } + private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 36b7f8805f46..2b42e3ba4d6f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -161,16 +161,13 @@ public ParquetValueReader list( return null; } - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 006c55d1b8a7..064ff9a0b5de 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -32,6 +32,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -51,7 +52,6 @@ import org.apache.iceberg.types.Types; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.io.LocalOutputFile; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -188,7 +188,7 @@ public void testTwoLevelList() throws IOException { assertThat(testFile.delete()).isTrue(); ParquetWriter writer = - AvroParquetWriter.builder(new LocalOutputFile(testFile.toPath())) + AvroParquetWriter.builder(new Path(testFile.toURI())) .withDataModel(GenericData.get()) .withSchema(avroSchema) .config("parquet.avro.add-list-element-records", "true") @@ -221,6 +221,63 @@ public void testTwoLevelList() throws IOException { } } + @Test + public void testTwoLevelListWithEmptyLists() throws IOException { + Schema schema = + new Schema( + optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())), + optional(2, "label", Types.StringType.get())); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).isTrue(); + + ParquetWriter writer = + AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder rb = new GenericRecordBuilder(avroSchema); + writer.write(rb.set("names", java.util.Arrays.asList("alice")).set("label", "row0").build()); + writer.write(rb.set("names", java.util.Collections.emptyList()).set("label", "row1").build()); + writer.write(rb.set("names", java.util.Arrays.asList("bob")).set("label", "row2").build()); + writer.write( + rb.set("names", java.util.Arrays.asList("carol", "dave")).set("label", "row3").build()); + writer.close(); + + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .build()) { + Iterator rows = reader.iterator(); + + RowData row0 = rows.next(); + assertThat(row0.getArray(0).getString(0).toString()).isEqualTo("alice"); + assertThat(row0.getString(1).toString()).isEqualTo("row0"); + + RowData row1 = rows.next(); + assertThat(row1.isNullAt(0)).isFalse(); + assertThat(row1.getArray(0).size()).isEqualTo(0); + assertThat(row1.getString(1).toString()).isEqualTo("row1"); + + RowData row2 = rows.next(); + assertThat(row2.getArray(0).getString(0).toString()).isEqualTo("bob"); + assertThat(row2.getString(1).toString()).isEqualTo("row2"); + + RowData row3 = rows.next(); + assertThat(row3.getArray(0).size()).isEqualTo(2); + assertThat(row3.getArray(0).getString(0).toString()).isEqualTo("carol"); + assertThat(row3.getArray(0).getString(1).toString()).isEqualTo("dave"); + assertThat(row3.getString(1).toString()).isEqualTo("row3"); + + assertThat(rows).isExhausted(); + } + } + private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index b1fd8f43a578..f5c59e6652bf 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -303,16 +303,13 @@ public ParquetValueReader list( return null; } - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ParquetValueReaders.ListReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java index 98cc480d10c4..f5f21793a016 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java @@ -116,16 +116,13 @@ public ParquetValueReader struct( @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ListReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 9a81626827c6..f73125e89621 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -233,7 +233,7 @@ public static Type determineListElementType(GroupType array) { // Parquet LIST backwards-compatibility rules. // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - static boolean isOldListElementType(GroupType list) { + public static boolean isOldListElementType(GroupType list) { Type repeatedType = list.getFields().get(0); String parentName = list.getName(); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 8aa9aa4779d9..f6aca6839ff8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -43,11 +43,13 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -62,6 +64,54 @@ public static ParquetValueReader option( return reader; } + /** + * Resolves def/rep levels and element reader for a list, handling 2-level and 3-level encodings. + */ + public static ResolvedList resolveList( + MessageType fileType, + GroupType array, + String[] repeatedPath, + String[] elementPath, + ParquetValueReader elementReader) { + Type elementType = ParquetSchemaUtil.determineListElementType(array); + int elementD = fileType.getMaxDefinitionLevel(elementPath) - 1; + + if (ParquetSchemaUtil.isOldListElementType(array)) { + // 2-level list: use element path for levels and skip OptionReader (elements are non-null) + int elementR = fileType.getMaxRepetitionLevel(elementPath) - 1; + return new ResolvedList<>(elementD, elementR, elementReader); + } else { + int repeatedD = fileType.getMaxDefinitionLevel(repeatedPath) - 1; + int repeatedR = fileType.getMaxRepetitionLevel(repeatedPath) - 1; + return new ResolvedList<>(repeatedD, repeatedR, option(elementType, elementD, elementReader)); + } + } + + /** Holds the resolved definition level, repetition level, and reader for a list column. */ + public static class ResolvedList { + private final int definitionLevel; + private final int repetitionLevel; + private final ParquetValueReader reader; + + ResolvedList(int definitionLevel, int repetitionLevel, ParquetValueReader reader) { + this.definitionLevel = definitionLevel; + this.repetitionLevel = repetitionLevel; + this.reader = reader; + } + + public int definitionLevel() { + return definitionLevel; + } + + public int repetitionLevel() { + return repetitionLevel; + } + + public ParquetValueReader reader() { + return reader; + } + } + public static ParquetValueReader unboxed(ColumnDescriptor desc) { return new UnboxedReader<>(desc); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index f9300099fe25..4afe8edcfe57 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -185,16 +185,13 @@ private ParquetValueReader defaultReader( @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 87c97cc7a663..81a5a4352d9c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -183,16 +183,13 @@ private ParquetValueReader defaultReader( @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 28a9a31c6a6e..3a77d506633e 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -189,16 +189,13 @@ private ParquetValueReader defaultReader( @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index a19ed8060737..384f2b03db62 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -191,16 +191,13 @@ private ParquetValueReader defaultReader( @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + ParquetValueReaders.ResolvedList resolved = + ParquetValueReaders.resolveList( + type, array, currentPath(), path(elementType.getName()), elementReader); return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader()); } @Override diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index f42c37f5e41d..4217f8af5394 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.data; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -50,9 +51,14 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; @@ -261,4 +267,107 @@ public void testUnknownMapType() { .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Cannot convert value Parquet: unknown"); } + + /** Verifies reading 2-level (Thrift) encoded lists with empty lists interspersed */ + @Test + public void testTwoLevelThriftListWithStrings() throws IOException { + // Parquet schema: 2-level Thrift-style list with _tuple naming convention + MessageType parquetSchema = + MessageTypeParser.parseMessageType( + "message root {\n" + + " optional group names (LIST) {\n" + + " repeated binary names_tuple (UTF8);\n" + + " }\n" + + " optional binary label (UTF8);\n" + + "}\n"); + + // Iceberg schema matching the Parquet structure + Schema icebergSchema = + new Schema( + optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())), + optional(2, "label", Types.StringType.get())); + + // Write a 2-level Parquet file using the example writer + java.io.File testFile = temp.resolve("test-two-level-thrift-" + System.nanoTime()).toFile(); + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + + try (ParquetWriter writer = + ExampleParquetWriter.builder(new org.apache.hadoop.fs.Path(testFile.toURI())) + .withType(parquetSchema) + .build()) { + + // Row 0: single element list + Group row0 = factory.newGroup(); + row0.addGroup("names").append("names_tuple", "alice"); + row0.append("label", "row0"); + writer.write(row0); + + // Row 1: empty list — critical for triggering the bug + Group row1 = factory.newGroup(); + row1.addGroup("names"); // group present, no repeated elements = empty list + row1.append("label", "row1"); + writer.write(row1); + + // Row 2: single element + Group row2 = factory.newGroup(); + row2.addGroup("names").append("names_tuple", "bob"); + row2.append("label", "row2"); + writer.write(row2); + + // Row 3: multi-element list + Group row3 = factory.newGroup(); + Group names3 = row3.addGroup("names"); + names3.append("names_tuple", "carol"); + names3.append("names_tuple", "dave"); + row3.append("label", "row3"); + writer.write(row3); + + // Row 4: empty list again + Group row4 = factory.newGroup(); + row4.addGroup("names"); + row4.append("label", "row4"); + writer.write(row4); + + // Row 5: single element after empty + Group row5 = factory.newGroup(); + row5.addGroup("names").append("names_tuple", "eve"); + row5.append("label", "row5"); + writer.write(row5); + } + + // Read through the Iceberg Spark reader + List rows = rowsFromFile(Files.localInput(testFile), icebergSchema); + assertThat(rows).hasSize(6); + + // Row 0: ["alice"] + assertThat(rows.get(0).getArray(0).numElements()).isEqualTo(1); + assertThat(rows.get(0).getArray(0).getUTF8String(0).toString()).isEqualTo("alice"); + assertThat(rows.get(0).getString(1)).isEqualTo("row0"); + + // Row 1: [] — empty list, not null + assertThat(rows.get(1).isNullAt(0)).isFalse(); + assertThat(rows.get(1).getArray(0).numElements()).isEqualTo(0); + assertThat(rows.get(1).getString(1)).isEqualTo("row1"); + + // Row 2: ["bob"] + assertThat(rows.get(2).getArray(0).numElements()).isEqualTo(1); + assertThat(rows.get(2).getArray(0).getUTF8String(0).toString()).isEqualTo("bob"); + assertThat(rows.get(2).getString(1)).isEqualTo("row2"); + + // Row 3: ["carol", "dave"] + assertThat(rows.get(3).getArray(0).numElements()).isEqualTo(2); + assertThat(rows.get(3).getArray(0).getUTF8String(0).toString()).isEqualTo("carol"); + assertThat(rows.get(3).getArray(0).getUTF8String(1).toString()).isEqualTo("dave"); + assertThat(rows.get(3).getString(1)).isEqualTo("row3"); + + // Row 4: [] — empty list + assertThat(rows.get(4).isNullAt(0)).isFalse(); + assertThat(rows.get(4).getArray(0).numElements()).isEqualTo(0); + assertThat(rows.get(4).getString(1)).isEqualTo("row4"); + + // Row 5: ["eve"] + assertThat(rows.get(5).getArray(0).numElements()).isEqualTo(1); + assertThat(rows.get(5).getArray(0).getUTF8String(0).toString()).isEqualTo("eve"); + assertThat(rows.get(5).getString(1)).isEqualTo("row5"); + } }