diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java index e2e13861ca2d..6fde8bbebaff 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java @@ -21,7 +21,10 @@ import org.apache.iceberg.avro.AvroFormatModel; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.avro.PlannedDataReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.parquet.ParquetFormatModel; public class GenericFormatModels { public static void register() { @@ -34,6 +37,17 @@ public static void register() { PlannedDataReader.create(icebergSchema, idToConstant))); FormatModelRegistry.register(AvroFormatModel.forPositionDeletes()); + + FormatModelRegistry.register( + ParquetFormatModel.create( + Record.class, + Void.class, + (icebergSchema, fileSchema, engineSchema) -> + GenericParquetWriter.create(icebergSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes()); } private GenericFormatModels() {} diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java index ca3dda30ab5f..ab5968da8b09 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java @@ -54,7 +54,8 @@ public class TestGenericFormatModels { private static final List TEST_RECORDS = RandomGenericData.generate(TestBase.SCHEMA, 10, 1L); - private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.AVRO}; + private static final FileFormat[] FILE_FORMATS = + new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET}; @TempDir protected Path temp; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index ae049d087583..2387d52edf2f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -302,8 +302,7 @@ WriteBuilder withWriterVersion(WriterVersion version) { } // supposed to always be a private method used strictly by data and delete write builders - private WriteBuilder createContextFunc( - Function, Context> newCreateContextFunc) { + WriteBuilder createContextFunc(Function, Context> newCreateContextFunc) { this.createContextFunc = newCreateContextFunc; return this; } @@ -498,7 +497,7 @@ public FileAppender build() throws IOException { } } - private static class Context { + static class Context { private final int rowGroupSize; private final int pageSize; private final int pageRowLimit; @@ -1176,6 +1175,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private Expression filter = null; private ReadSupport readSupport = null; private Function> batchedReaderFunc = null; + private BiFunction> batchedReaderFuncWithSchema = null; private ReaderFunction readerFunction = null; private boolean filterRecords = true; private boolean caseSensitive = true; @@ -1298,6 +1298,9 @@ public ReadBuilder createReaderFunc( Preconditions.checkArgument( this.batchedReaderFunc == null, "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.batchedReaderFuncWithSchema == null, + "Cannot set reader function: batched reader function with schema already set"); Preconditions.checkArgument( this.readerFunction == null, "Cannot set reader function: reader function already set"); this.readerFunction = new UnaryReaderFunction(newReaderFunction); @@ -1309,6 +1312,9 @@ public ReadBuilder createReaderFunc( Preconditions.checkArgument( this.batchedReaderFunc == null, "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.batchedReaderFuncWithSchema == null, + "Cannot set reader function: batched reader function with schema already set"); Preconditions.checkArgument( this.readerFunction == null, "Cannot set reader function: reader function already set"); this.readerFunction = new BinaryReaderFunction(newReaderFunction); @@ -1319,6 +1325,9 @@ public ReadBuilder createBatchedReaderFunc(Function> func) { + Preconditions.checkArgument( + this.batchedReaderFunc == null, + "Cannot set batched reader function: batched reader function already set"); + Preconditions.checkArgument( + this.batchedReaderFuncWithSchema == null, + "Cannot set reader function: batched reader function with schema already set"); + Preconditions.checkArgument( + this.readerFunction == null, + "Cannot set batched reader function: ReaderFunction already set"); + this.batchedReaderFuncWithSchema = func; + return this; + } + public ReadBuilder createReaderFunc(ReaderFunction reader) { Preconditions.checkArgument( this.batchedReaderFunc == null, "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.batchedReaderFuncWithSchema == null, + "Cannot set reader function: batched reader function with schema already set"); Preconditions.checkArgument( this.readerFunction == null, "Cannot set reader function: reader function already set"); this.readerFunction = reader; @@ -1389,7 +1416,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { } @Override - @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"}) public CloseableIterable build() { FileDecryptionProperties fileDecryptionProperties = null; if (fileEncryptionKey != null) { @@ -1404,7 +1431,9 @@ public CloseableIterable build() { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } - if (batchedReaderFunc != null || readerFunction != null) { + if (batchedReaderFunc != null + || batchedReaderFuncWithSchema != null + || readerFunction != null) { ParquetReadOptions.Builder optionsBuilder; if (file instanceof HadoopInputFile) { // remove read properties already set that may conflict with this read @@ -1441,12 +1470,16 @@ public CloseableIterable build() { mapping = NameMapping.empty(); } - if (batchedReaderFunc != null) { + Function> batchedFunc = + batchedReaderFuncWithSchema != null + ? messageType -> batchedReaderFuncWithSchema.apply(schema, messageType) + : batchedReaderFunc; + if (batchedFunc != null) { return new VectorizedParquetReader<>( file, schema, options, - batchedReaderFunc, + batchedFunc, mapping, filter, reuseContainers, diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java new file mode 100644 index 000000000000..90d6e3ef41ac --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.BaseFormatModel; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.apache.iceberg.formats.ReadBuilder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.schema.MessageType; + +public class ParquetFormatModel + extends BaseFormatModel, R, MessageType> { + public static final String WRITER_VERSION_KEY = "parquet.writer.version"; + private final boolean isBatchReader; + + public static ParquetFormatModel, Void, Object> forPositionDeletes() { + return new ParquetFormatModel<>(PositionDelete.deleteClass(), Void.class, null, null, false); + } + + public static ParquetFormatModel> create( + Class type, + Class schemaType, + WriterFunction, S, MessageType> writerFunction, + ReaderFunction, S, MessageType> readerFunction) { + return new ParquetFormatModel<>(type, schemaType, writerFunction, readerFunction, false); + } + + public static ParquetFormatModel> create( + Class type, + Class schemaType, + ReaderFunction, S, MessageType> batchReaderFunction) { + return new ParquetFormatModel<>(type, schemaType, null, batchReaderFunction, true); + } + + private ParquetFormatModel( + Class type, + Class schemaType, + WriterFunction, S, MessageType> writerFunction, + ReaderFunction readerFunction, + boolean isBatchReader) { + super(type, schemaType, writerFunction, readerFunction); + this.isBatchReader = isBatchReader; + } + + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return new WriteBuilderWrapper<>(outputFile, writerFunction()); + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return new ReadBuilderWrapper<>(inputFile, readerFunction(), isBatchReader); + } + + private static class WriteBuilderWrapper implements ModelWriteBuilder { + private final Parquet.WriteBuilder internal; + private final WriterFunction, S, MessageType> writerFunction; + private Schema schema; + private S engineSchema; + private FileContent content; + + private WriteBuilderWrapper( + EncryptedOutputFile outputFile, + WriterFunction, S, MessageType> writerFunction) { + this.internal = Parquet.write(outputFile); + this.writerFunction = writerFunction; + } + + @Override + public ModelWriteBuilder schema(Schema newSchema) { + this.schema = newSchema; + internal.schema(newSchema); + return this; + } + + @Override + public ModelWriteBuilder engineSchema(S newSchema) { + this.engineSchema = newSchema; + return this; + } + + @Override + public ModelWriteBuilder set(String property, String value) { + if (WRITER_VERSION_KEY.equals(property)) { + internal.writerVersion(ParquetProperties.WriterVersion.valueOf(value)); + } + + internal.set(property, value); + return this; + } + + @Override + public ModelWriteBuilder setAll(Map properties) { + internal.setAll(properties); + return this; + } + + @Override + public ModelWriteBuilder meta(String property, String value) { + internal.meta(property, value); + return this; + } + + @Override + public ModelWriteBuilder meta(Map properties) { + internal.meta(properties); + return this; + } + + @Override + public ModelWriteBuilder content(FileContent newContent) { + this.content = newContent; + return this; + } + + @Override + public ModelWriteBuilder metricsConfig(MetricsConfig metricsConfig) { + internal.metricsConfig(metricsConfig); + return this; + } + + @Override + public ModelWriteBuilder overwrite() { + internal.overwrite(); + return this; + } + + @Override + public ModelWriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + internal.withFileEncryptionKey(encryptionKey); + return this; + } + + @Override + public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + internal.withAADPrefix(aadPrefix); + return this; + } + + @Override + public FileAppender build() throws IOException { + switch (content) { + case DATA: + internal.createContextFunc(Parquet.WriteBuilder.Context::dataContext); + internal.createWriterFunc( + (icebergSchema, messageType) -> + writerFunction.write(icebergSchema, messageType, engineSchema)); + break; + case EQUALITY_DELETES: + internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext); + internal.createWriterFunc( + (icebergSchema, messageType) -> + writerFunction.write(icebergSchema, messageType, engineSchema)); + break; + case POSITION_DELETES: + Preconditions.checkState( + schema == null, + "Invalid schema: %s. Position deletes with schema are not supported by the API.", + schema); + Preconditions.checkState( + engineSchema == null, + "Invalid engineSchema: %s. Position deletes with schema are not supported by the API.", + engineSchema); + + internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext); + internal.createWriterFunc( + (icebergSchema, messageType) -> + new ParquetValueWriters.PositionDeleteStructWriter( + (ParquetValueWriters.StructWriter) + GenericParquetWriter.create(icebergSchema, messageType), + Function.identity())); + internal.schema(DeleteSchemaUtil.pathPosSchema()); + break; + default: + throw new IllegalArgumentException("Unknown file content: " + content); + } + + return internal.build(); + } + } + + private static class ReadBuilderWrapper implements ReadBuilder { + private final Parquet.ReadBuilder internal; + private final ReaderFunction readerFunction; + private final boolean isBatchReader; + private S engineSchema; + private Map idToConstant = ImmutableMap.of(); + + private ReadBuilderWrapper( + InputFile inputFile, + ReaderFunction readerFunction, + boolean isBatchReader) { + this.internal = Parquet.read(inputFile); + this.readerFunction = readerFunction; + this.isBatchReader = isBatchReader; + } + + @Override + public ReadBuilder split(long newStart, long newLength) { + internal.split(newStart, newLength); + return this; + } + + @Override + public ReadBuilder project(Schema schema) { + internal.project(schema); + return this; + } + + @Override + public ReadBuilder engineProjection(S schema) { + this.engineSchema = schema; + return this; + } + + @Override + public ReadBuilder caseSensitive(boolean caseSensitive) { + internal.caseSensitive(caseSensitive); + return this; + } + + @Override + public ReadBuilder filter(Expression filter) { + internal.filter(filter); + return this; + } + + @Override + public ReadBuilder set(String key, String value) { + internal.set(key, value); + return this; + } + + @Override + public ReadBuilder reuseContainers() { + internal.reuseContainers(); + return this; + } + + @Override + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + internal.recordsPerBatch(numRowsPerBatch); + return this; + } + + @Override + public ReadBuilder idToConstant(Map newIdToConstant) { + this.idToConstant = newIdToConstant; + return this; + } + + @Override + public ReadBuilder withNameMapping(NameMapping nameMapping) { + internal.withNameMapping(nameMapping); + return this; + } + + @Override + public CloseableIterable build() { + if (isBatchReader) { + return internal + .createBatchedReaderFunc( + (icebergSchema, messageType) -> + (VectorizedReader) + readerFunction.read(icebergSchema, messageType, engineSchema, idToConstant)) + .build(); + } else { + return internal + .createReaderFunc( + (icebergSchema, messageType) -> + (ParquetValueReader) + readerFunction.read(icebergSchema, messageType, engineSchema, idToConstant)) + .build(); + } + } + } +}