Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,13 @@ public ParquetValueReader<?> list(
return null;
}

String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
ParquetValueReaders.ResolvedList<?> resolved =
ParquetValueReaders.resolveList(
type, array, currentPath(), path(elementType.getName()), elementReader);

return new ArrayReader<>(
repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,63 @@ public void testTwoLevelList() throws IOException {
}
}

@Test
public void testTwoLevelListWithEmptyLists() throws IOException {
Schema schema =
new Schema(
optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())),
optional(2, "label", Types.StringType.get()));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).isTrue();

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder rb = new GenericRecordBuilder(avroSchema);
writer.write(rb.set("names", java.util.Arrays.asList("alice")).set("label", "row0").build());
writer.write(rb.set("names", java.util.Collections.emptyList()).set("label", "row1").build());
writer.write(rb.set("names", java.util.Arrays.asList("bob")).set("label", "row2").build());
writer.write(
rb.set("names", java.util.Arrays.asList("carol", "dave")).set("label", "row3").build());
writer.close();

try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
.build()) {
Iterator<RowData> rows = reader.iterator();

RowData row0 = rows.next();
assertThat(row0.getArray(0).getString(0).toString()).isEqualTo("alice");
assertThat(row0.getString(1).toString()).isEqualTo("row0");

RowData row1 = rows.next();
assertThat(row1.isNullAt(0)).isFalse();
assertThat(row1.getArray(0).size()).isEqualTo(0);
assertThat(row1.getString(1).toString()).isEqualTo("row1");

RowData row2 = rows.next();
assertThat(row2.getArray(0).getString(0).toString()).isEqualTo("bob");
assertThat(row2.getString(1).toString()).isEqualTo("row2");

RowData row3 = rows.next();
assertThat(row3.getArray(0).size()).isEqualTo(2);
assertThat(row3.getArray(0).getString(0).toString()).isEqualTo("carol");
assertThat(row3.getArray(0).getString(1).toString()).isEqualTo("dave");
assertThat(row3.getString(1).toString()).isEqualTo("row3");

assertThat(rows).isExhausted();
}
}

private void writeAndValidate(
Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,13 @@ public ParquetValueReader<?> list(
return null;
}

String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
ParquetValueReaders.ResolvedList<?> resolved =
ParquetValueReaders.resolveList(
type, array, currentPath(), path(elementType.getName()), elementReader);

return new ArrayReader<>(
repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,63 @@ public void testTwoLevelList() throws IOException {
}
}

@Test
public void testTwoLevelListWithEmptyLists() throws IOException {
Schema schema =
new Schema(
optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())),
optional(2, "label", Types.StringType.get()));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).isTrue();

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder rb = new GenericRecordBuilder(avroSchema);
writer.write(rb.set("names", java.util.Arrays.asList("alice")).set("label", "row0").build());
writer.write(rb.set("names", java.util.Collections.emptyList()).set("label", "row1").build());
writer.write(rb.set("names", java.util.Arrays.asList("bob")).set("label", "row2").build());
writer.write(
rb.set("names", java.util.Arrays.asList("carol", "dave")).set("label", "row3").build());
writer.close();

try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
.build()) {
Iterator<RowData> rows = reader.iterator();

RowData row0 = rows.next();
assertThat(row0.getArray(0).getString(0).toString()).isEqualTo("alice");
assertThat(row0.getString(1).toString()).isEqualTo("row0");

RowData row1 = rows.next();
assertThat(row1.isNullAt(0)).isFalse();
assertThat(row1.getArray(0).size()).isEqualTo(0);
assertThat(row1.getString(1).toString()).isEqualTo("row1");

RowData row2 = rows.next();
assertThat(row2.getArray(0).getString(0).toString()).isEqualTo("bob");
assertThat(row2.getString(1).toString()).isEqualTo("row2");

RowData row3 = rows.next();
assertThat(row3.getArray(0).size()).isEqualTo(2);
assertThat(row3.getArray(0).getString(0).toString()).isEqualTo("carol");
assertThat(row3.getArray(0).getString(1).toString()).isEqualTo("dave");
assertThat(row3.getString(1).toString()).isEqualTo("row3");

assertThat(rows).isExhausted();
}
}

private void writeAndValidate(
Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,13 @@ public ParquetValueReader<?> list(
return null;
}

String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
ParquetValueReaders.ResolvedList<?> resolved =
ParquetValueReaders.resolveList(
type, array, currentPath(), path(elementType.getName()), elementReader);

return new ArrayReader<>(
repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
Expand All @@ -51,7 +52,6 @@
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -188,7 +188,7 @@ public void testTwoLevelList() throws IOException {
assertThat(testFile.delete()).isTrue();

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new LocalOutputFile(testFile.toPath()))
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
Expand Down Expand Up @@ -221,6 +221,63 @@ public void testTwoLevelList() throws IOException {
}
}

@Test
public void testTwoLevelListWithEmptyLists() throws IOException {
Schema schema =
new Schema(
optional(1, "names", Types.ListType.ofRequired(3, Types.StringType.get())),
optional(2, "label", Types.StringType.get()));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).isTrue();

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder rb = new GenericRecordBuilder(avroSchema);
writer.write(rb.set("names", java.util.Arrays.asList("alice")).set("label", "row0").build());
writer.write(rb.set("names", java.util.Collections.emptyList()).set("label", "row1").build());
writer.write(rb.set("names", java.util.Arrays.asList("bob")).set("label", "row2").build());
writer.write(
rb.set("names", java.util.Arrays.asList("carol", "dave")).set("label", "row3").build());
writer.close();

try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
.build()) {
Iterator<RowData> rows = reader.iterator();

RowData row0 = rows.next();
assertThat(row0.getArray(0).getString(0).toString()).isEqualTo("alice");
assertThat(row0.getString(1).toString()).isEqualTo("row0");

RowData row1 = rows.next();
assertThat(row1.isNullAt(0)).isFalse();
assertThat(row1.getArray(0).size()).isEqualTo(0);
assertThat(row1.getString(1).toString()).isEqualTo("row1");

RowData row2 = rows.next();
assertThat(row2.getArray(0).getString(0).toString()).isEqualTo("bob");
assertThat(row2.getString(1).toString()).isEqualTo("row2");

RowData row3 = rows.next();
assertThat(row3.getArray(0).size()).isEqualTo(2);
assertThat(row3.getArray(0).getString(0).toString()).isEqualTo("carol");
assertThat(row3.getArray(0).getString(1).toString()).isEqualTo("dave");
assertThat(row3.getString(1).toString()).isEqualTo("row3");

assertThat(rows).isExhausted();
}
}

private void writeAndValidate(
Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,13 @@ public ParquetValueReader<?> list(
return null;
}

String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
ParquetValueReaders.ResolvedList<?> resolved =
ParquetValueReaders.resolveList(
type, array, currentPath(), path(elementType.getName()), elementReader);

return new ParquetValueReaders.ListReader<>(
repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,13 @@ public ParquetValueReader<?> struct(
@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
ParquetValueReaders.ResolvedList<?> resolved =
ParquetValueReaders.resolveList(
type, array, currentPath(), path(elementType.getName()), elementReader);

return new ListReader<>(
repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
resolved.definitionLevel(), resolved.repetitionLevel(), resolved.reader());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public static Type determineListElementType(GroupType array) {

// Parquet LIST backwards-compatibility rules.
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
static boolean isOldListElementType(GroupType list) {
public static boolean isOldListElementType(GroupType list) {
Type repeatedType = list.getFields().get(0);
String parentName = list.getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

Expand All @@ -62,6 +64,54 @@ public static <T> ParquetValueReader<T> option(
return reader;
}

/**
* Resolves def/rep levels and element reader for a list, handling 2-level and 3-level encodings.
*/
public static <E> ResolvedList<E> resolveList(
MessageType fileType,
GroupType array,
String[] repeatedPath,
String[] elementPath,
ParquetValueReader<E> elementReader) {
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = fileType.getMaxDefinitionLevel(elementPath) - 1;

if (ParquetSchemaUtil.isOldListElementType(array)) {
// 2-level list: use element path for levels and skip OptionReader (elements are non-null)
int elementR = fileType.getMaxRepetitionLevel(elementPath) - 1;
return new ResolvedList<>(elementD, elementR, elementReader);
} else {
int repeatedD = fileType.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = fileType.getMaxRepetitionLevel(repeatedPath) - 1;
return new ResolvedList<>(repeatedD, repeatedR, option(elementType, elementD, elementReader));
}
}

/** Holds the resolved definition level, repetition level, and reader for a list column. */
public static class ResolvedList<E> {
private final int definitionLevel;
private final int repetitionLevel;
private final ParquetValueReader<E> reader;

ResolvedList(int definitionLevel, int repetitionLevel, ParquetValueReader<E> reader) {
this.definitionLevel = definitionLevel;
this.repetitionLevel = repetitionLevel;
this.reader = reader;
}

public int definitionLevel() {
return definitionLevel;
}

public int repetitionLevel() {
return repetitionLevel;
}

public ParquetValueReader<E> reader() {
return reader;
}
}

public static ParquetValueReader<Integer> unboxed(ColumnDescriptor desc) {
return new UnboxedReader<>(desc);
}
Expand Down
Loading
Loading