Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.iceberg.avro.AvroFormatModel;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.parquet.ParquetFormatModel;

public class GenericFormatModels {
public static void register() {
Expand All @@ -34,6 +37,17 @@ public static void register() {
PlannedDataReader.create(icebergSchema, idToConstant)));

FormatModelRegistry.register(AvroFormatModel.forPositionDeletes());

FormatModelRegistry.register(
ParquetFormatModel.create(
Record.class,
Void.class,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found Void.class for schema type weird when reviewing the Orc PR. I see Ryan has a previous comment that suggested Void.class.

I checked the uber PR #12298 . For Spark InternalRow, the schema type is Spark StructType. For Flink RowData, the schema type is RowType.

For Iceberg generic Record, shouldn't the schema type be Iceberg Schema or StructType?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially used Schema, but I don’t have a strong preference since it isn’t used yet. Once we start using it, we can adjust the type as needed. The FormatModelRegistry expects the caller to supply the generic parameters, so until this is actually used, the specific type doesn’t make a difference.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg GenericRecord requires an Iceberg Schema or StructType to create the object. Hence, I thought the schema type shouldn't be Void.class.

I do agree that this is an implementation code that we can easily update later. There is no interface change here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with Schema is that this is almost certainly going to be null when passed to the read and write function because there isn't a separate engine schema. I'd prefer using Void in that case so that people don't actually pass something through. But this gets erased anyway so it doesn't really matter. Callers can do whatever they like anyway, I guess. That's the weird thing about having types that are erased and then re-added using @SuppressWarnings("unchecked").

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, should we mandate that engineSchema is not passed for this? That would make this easier because it would be rejected by the Parquet/Record builders. I'm in favor, but not strongly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but introducing a new flag across all format models solely to disable engine schema feels like unnecessary complexity to me. And if we later decide to support shredding in the generic models as well, Void would need to become Schema, and we’d have to undo all of these changes anyway.

Let me merge this as it is, and cycle back to this later if we see value in adding these validations

(icebergSchema, fileSchema, engineSchema) ->
GenericParquetWriter.create(icebergSchema, fileSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));

FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes());
}

private GenericFormatModels() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class TestGenericFormatModels {
private static final List<Record> TEST_RECORDS =
RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);

private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.AVRO};
private static final FileFormat[] FILE_FORMATS =
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET};

@TempDir protected Path temp;

Expand Down
47 changes: 40 additions & 7 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ WriteBuilder withWriterVersion(WriterVersion version) {
}

// supposed to always be a private method used strictly by data and delete write builders
private WriteBuilder createContextFunc(
Function<Map<String, String>, Context> newCreateContextFunc) {
WriteBuilder createContextFunc(Function<Map<String, String>, Context> newCreateContextFunc) {
this.createContextFunc = newCreateContextFunc;
return this;
}
Expand Down Expand Up @@ -498,7 +497,7 @@ public <D> FileAppender<D> build() throws IOException {
}
}

private static class Context {
static class Context {
private final int rowGroupSize;
private final int pageSize;
private final int pageRowLimit;
Expand Down Expand Up @@ -1176,6 +1175,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder {
private Expression filter = null;
private ReadSupport<?> readSupport = null;
private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
private BiFunction<Schema, MessageType, VectorizedReader<?>> batchedReaderFuncWithSchema = null;
private ReaderFunction readerFunction = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
Expand Down Expand Up @@ -1298,6 +1298,9 @@ public ReadBuilder createReaderFunc(
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set reader function: batched reader function already set");
Preconditions.checkArgument(
this.batchedReaderFuncWithSchema == null,
"Cannot set reader function: batched reader function with schema already set");
Preconditions.checkArgument(
this.readerFunction == null, "Cannot set reader function: reader function already set");
this.readerFunction = new UnaryReaderFunction(newReaderFunction);
Expand All @@ -1309,6 +1312,9 @@ public ReadBuilder createReaderFunc(
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set reader function: batched reader function already set");
Preconditions.checkArgument(
this.batchedReaderFuncWithSchema == null,
"Cannot set reader function: batched reader function with schema already set");
Preconditions.checkArgument(
this.readerFunction == null, "Cannot set reader function: reader function already set");
this.readerFunction = new BinaryReaderFunction(newReaderFunction);
Expand All @@ -1319,17 +1325,38 @@ public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReade
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set batched reader function: batched reader function already set");
Preconditions.checkArgument(
this.batchedReaderFuncWithSchema == null,
"Cannot set reader function: batched reader function with schema already set");
Preconditions.checkArgument(
this.readerFunction == null,
"Cannot set batched reader function: ReaderFunction already set");
this.batchedReaderFunc = func;
return this;
}

public ReadBuilder createBatchedReaderFunc(
BiFunction<Schema, MessageType, VectorizedReader<?>> func) {
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set batched reader function: batched reader function already set");
Preconditions.checkArgument(
this.batchedReaderFuncWithSchema == null,
"Cannot set reader function: batched reader function with schema already set");
Preconditions.checkArgument(
this.readerFunction == null,
"Cannot set batched reader function: ReaderFunction already set");
this.batchedReaderFuncWithSchema = func;
return this;
}

public ReadBuilder createReaderFunc(ReaderFunction reader) {
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set reader function: batched reader function already set");
Preconditions.checkArgument(
this.batchedReaderFuncWithSchema == null,
"Cannot set reader function: batched reader function with schema already set");
Preconditions.checkArgument(
this.readerFunction == null, "Cannot set reader function: reader function already set");
this.readerFunction = reader;
Expand Down Expand Up @@ -1389,7 +1416,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
}

@Override
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
Expand All @@ -1404,7 +1431,9 @@ public <D> CloseableIterable<D> build() {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
}

if (batchedReaderFunc != null || readerFunction != null) {
if (batchedReaderFunc != null
|| batchedReaderFuncWithSchema != null
|| readerFunction != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this read
Expand Down Expand Up @@ -1441,12 +1470,16 @@ public <D> CloseableIterable<D> build() {
mapping = NameMapping.empty();
}

if (batchedReaderFunc != null) {
Function<MessageType, VectorizedReader<?>> batchedFunc =
batchedReaderFuncWithSchema != null
? messageType -> batchedReaderFuncWithSchema.apply(schema, messageType)
: batchedReaderFunc;
if (batchedFunc != null) {
return new VectorizedParquetReader<>(
file,
schema,
options,
batchedReaderFunc,
batchedFunc,
mapping,
filter,
reuseContainers,
Expand Down
Loading