diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 6c7edc25b691..4a5136f58e71 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -182,8 +182,7 @@ public WriteBuilder overwrite(boolean enabled) { } // supposed to always be a private method used strictly by data and delete write builders - private WriteBuilder createContextFunc( - Function, Context> newCreateContextFunc) { + WriteBuilder createContextFunc(Function, Context> newCreateContextFunc) { this.createContextFunc = newCreateContextFunc; return this; } @@ -217,7 +216,7 @@ public FileAppender build() throws IOException { overwrite); } - private static class Context { + static class Context { private final CodecFactory codec; private Context(CodecFactory codec) { @@ -568,7 +567,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { } /** A {@link DatumWriter} implementation that wraps another to produce position deletes. */ - private static class PositionDatumWriter implements MetricsAwareDatumWriter> { + static class PositionDatumWriter implements MetricsAwareDatumWriter> { private static final ValueWriter PATH_WRITER = ValueWriters.strings(); private static final ValueWriter POS_WRITER = ValueWriters.longs(); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java new file mode 100644 index 000000000000..e0fcf8952604 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.BaseFormatModel; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.apache.iceberg.formats.ReadBuilder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class AvroFormatModel + extends BaseFormatModel, DatumReader, Schema> { + + public static AvroFormatModel, Void> forPositionDeletes() { + return new AvroFormatModel<>(PositionDelete.deleteClass(), Void.class, null, null); + } + + public static AvroFormatModel create( + Class type, + Class schemaType, + WriterFunction, S, Schema> writerFunction, + ReaderFunction, S, Schema> readerFunction) { + return new AvroFormatModel<>(type, schemaType, writerFunction, readerFunction); + } + + private AvroFormatModel( + Class type, + Class schemaType, + WriterFunction, S, Schema> writerFunction, + ReaderFunction, S, Schema> readerFunction) { + super(type, schemaType, writerFunction, readerFunction); + } + + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return new WriteBuilderWrapper<>(outputFile, writerFunction()); + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return new ReadBuilderWrapper<>(inputFile, readerFunction()); + } + + private static class WriteBuilderWrapper implements ModelWriteBuilder { + private final Avro.WriteBuilder internal; + private final WriterFunction, S, Schema> writerFunction; + private org.apache.iceberg.Schema schema; + private S engineSchema; + private FileContent content; + + private WriteBuilderWrapper( + EncryptedOutputFile outputFile, WriterFunction, S, Schema> writerFunction) { + this.internal = Avro.write(outputFile.encryptingOutputFile()); + this.writerFunction = writerFunction; + } + + @Override + public ModelWriteBuilder schema(org.apache.iceberg.Schema newSchema) { + this.schema = newSchema; + internal.schema(newSchema); + return this; + } + + @Override + public ModelWriteBuilder engineSchema(S newSchema) { + this.engineSchema = newSchema; + return this; + } + + @Override + public ModelWriteBuilder set(String property, String value) { + internal.set(property, value); + return this; + } + + @Override + public ModelWriteBuilder setAll(Map properties) { + internal.setAll(properties); + return this; + } + + @Override + public ModelWriteBuilder meta(String property, String value) { + internal.meta(property, value); + return this; + } + + @Override + public ModelWriteBuilder meta(Map properties) { + internal.meta(properties); + return this; + } + + @Override + public ModelWriteBuilder content(FileContent newContent) { + this.content = newContent; + return this; + } + + @Override + public ModelWriteBuilder metricsConfig(MetricsConfig metricsConfig) { + internal.metricsConfig(metricsConfig); + return this; + } + + @Override + public ModelWriteBuilder overwrite() { + internal.overwrite(); + return this; + } + + @Override + public ModelWriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Avro does not support file encryption keys"); + } + + @Override + public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Avro does not support AAD prefix"); + } + + @Override + public FileAppender build() throws IOException { + switch (content) { + case DATA: + internal.createContextFunc(Avro.WriteBuilder.Context::dataContext); + internal.createWriterFunc( + avroSchema -> writerFunction.write(schema, avroSchema, engineSchema)); + break; + case EQUALITY_DELETES: + internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext); + internal.createWriterFunc( + avroSchema -> writerFunction.write(schema, avroSchema, engineSchema)); + break; + case POSITION_DELETES: + Preconditions.checkState( + schema == null, + "Invalid schema: %s. Position deletes with schema are not supported by the API.", + schema); + Preconditions.checkState( + engineSchema == null, + "Invalid engineSchema: %s. Position deletes with schema are not supported by the API.", + engineSchema); + + internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext); + internal.createWriterFunc(unused -> new Avro.PositionDatumWriter()); + internal.schema(DeleteSchemaUtil.pathPosSchema()); + break; + default: + throw new IllegalArgumentException("Unknown file content: " + content); + } + + return internal.build(); + } + } + + private static class ReadBuilderWrapper implements ReadBuilder { + private final Avro.ReadBuilder internal; + private final ReaderFunction, S, Schema> readerFunction; + private S engineSchema; + private Map idToConstant = ImmutableMap.of(); + + private ReadBuilderWrapper( + InputFile inputFile, ReaderFunction, S, Schema> readerFunction) { + this.internal = Avro.read(inputFile); + this.readerFunction = readerFunction; + } + + @Override + public ReadBuilder split(long newStart, long newLength) { + internal.split(newStart, newLength); + return this; + } + + @Override + public ReadBuilder project(org.apache.iceberg.Schema schema) { + internal.project(schema); + return this; + } + + @Override + public ReadBuilder engineProjection(S schema) { + this.engineSchema = schema; + return this; + } + + @Override + public ReadBuilder caseSensitive(boolean caseSensitive) { + // Filtering is not supported in Avro reader, so case sensitivity does not matter + // This is not an error since filtering is best-effort. + return this; + } + + @Override + public ReadBuilder filter(Expression filter) { + // Filtering is not supported in Avro reader + // This is not an error since filtering is best-effort. + return this; + } + + @Override + public ReadBuilder set(String key, String value) { + // Configuration is not used for Avro reader creation + return this; + } + + @Override + public ReadBuilder reuseContainers() { + internal.reuseContainers(); + return this; + } + + @Override + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + throw new UnsupportedOperationException("Batch reading is not supported in Avro reader"); + } + + @Override + public ReadBuilder idToConstant(Map newIdToConstant) { + this.idToConstant = newIdToConstant; + return this; + } + + @Override + public ReadBuilder withNameMapping(org.apache.iceberg.mapping.NameMapping nameMapping) { + internal.withNameMapping(nameMapping); + return this; + } + + @Override + public CloseableIterable build() { + // The file schema is passed directly to the DatumReader by the Avro read path, so null is + // passed here + return internal + .createResolvingReader( + icebergSchema -> readerFunction.read(icebergSchema, null, engineSchema, idToConstant)) + .build(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java index b202e7fcf34e..c3b6cbaa9bff 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java @@ -31,6 +31,11 @@ public static PositionDelete create() { private PositionDelete() {} + @SuppressWarnings("unchecked") + public static Class> deleteClass() { + return (Class>) (Class) PositionDelete.class; + } + public PositionDelete set(CharSequence newPath, long newPos) { this.path = newPath; this.pos = newPos; diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index 91301b95dc1a..b9adafdbc2c9 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -54,7 +54,8 @@ private FormatModelRegistry() {} private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class); // The list of classes which are used for registering the reader and writer builders - private static final List CLASSES_TO_REGISTER = ImmutableList.of(); + private static final List CLASSES_TO_REGISTER = + ImmutableList.of("org.apache.iceberg.data.GenericFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS = @@ -169,12 +170,10 @@ public static FileWriterBuilder, S> equalityDelet * @param outputFile destination for the written data * @return a configured delete write builder for creating a {@link PositionDeleteWriter} */ - @SuppressWarnings("unchecked") public static FileWriterBuilder, ?> positionDeleteWriteBuilder( FileFormat format, EncryptedOutputFile outputFile) { - Class> deleteClass = - (Class>) (Class) PositionDelete.class; - FormatModel, ?> model = FormatModelRegistry.modelFor(format, deleteClass); + FormatModel, ?> model = + FormatModelRegistry.modelFor(format, PositionDelete.deleteClass()); return FileWriterBuilderImpl.forPositionDelete(model, outputFile); } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java new file mode 100644 index 000000000000..e2e13861ca2d --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; +import org.apache.iceberg.formats.FormatModelRegistry; + +public class GenericFormatModels { + public static void register() { + FormatModelRegistry.register( + AvroFormatModel.create( + Record.class, + Void.class, + (icebergSchema, fileSchema, engineSchema) -> DataWriter.create(fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + PlannedDataReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register(AvroFormatModel.forPositionDeletes()); + } + + private GenericFormatModels() {} +} diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index fc8d47680b0f..f2e2b4e7fa34 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -31,6 +31,14 @@ public class DataTestHelpers { private DataTestHelpers() {} + public static void assertEquals( + Types.StructType struct, List expected, List actual) { + assertThat(actual).hasSize(expected.size()); + for (int i = 0; i < expected.size(); i += 1) { + assertEquals(struct, expected.get(i), actual.get(i)); + } + } + public static void assertEquals(Types.StructType struct, Record expected, Record actual) { assertEquals(struct, expected, actual, null, -1); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java new file mode 100644 index 000000000000..ca3dda30ab5f --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestBase; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; + +public class TestGenericFormatModels { + private static final List TEST_RECORDS = + RandomGenericData.generate(TestBase.SCHEMA, 10, 1L); + + private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.AVRO}; + + @TempDir protected Path temp; + + private InMemoryFileIO fileIO; + private EncryptedOutputFile encryptedFile; + + @BeforeEach + public void before() { + this.fileIO = new InMemoryFileIO(); + this.encryptedFile = + EncryptedFiles.encryptedOutput( + fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY); + } + + @AfterEach + public void after() throws IOException { + fileIO.deleteFile(encryptedFile.encryptingOutputFile()); + this.encryptedFile = null; + if (fileIO != null) { + fileIO.close(); + } + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + public void testDataWriterRoundTrip(FileFormat fileFormat) throws IOException { + FileWriterBuilder, Schema> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); + + DataFile dataFile; + DataWriter writer = + writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build(); + try (writer) { + for (Record record : TEST_RECORDS) { + writer.write(record); + } + } + + dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size()); + assertThat(dataFile.format()).isEqualTo(fileFormat); + + // Verify the file content by reading it back + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) + .project(TestBase.SCHEMA) + .reuseContainers() + .build()) { + readRecords = ImmutableList.copyOf(CloseableIterable.transform(reader, Record::copy)); + } + + DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + public void testEqualityDeleteWriterRoundTrip(FileFormat fileFormat) throws IOException { + FileWriterBuilder, Schema> writerBuilder = + FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile); + + DeleteFile deleteFile; + EqualityDeleteWriter writer = + writerBuilder + .schema(TestBase.SCHEMA) + .spec(PartitionSpec.unpartitioned()) + .equalityFieldIds(3) + .build(); + try (writer) { + for (Record record : TEST_RECORDS) { + writer.write(record); + } + } + + deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size()); + assertThat(deleteFile.format()).isEqualTo(fileFormat); + assertThat(deleteFile.equalityFieldIds()).containsExactly(3); + + // Verify the file content by reading it back + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) + .project(TestBase.SCHEMA) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + public void testPositionDeleteWriterRoundTrip(FileFormat fileFormat) throws IOException { + Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, DELETE_FILE_POS); + + FileWriterBuilder, ?> writerBuilder = + FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile); + + PositionDelete delete1 = PositionDelete.create(); + delete1.set("data-file-1.parquet", 0L); + + PositionDelete delete2 = PositionDelete.create(); + delete2.set("data-file-1.parquet", 1L); + + List> positionDeletes = ImmutableList.of(delete1, delete2); + + DeleteFile deleteFile; + PositionDeleteWriter writer = writerBuilder.spec(PartitionSpec.unpartitioned()).build(); + try (writer) { + for (PositionDelete delete : positionDeletes) { + writer.write(delete); + } + } + + deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(2); + assertThat(deleteFile.format()).isEqualTo(fileFormat); + + // Verify the file content by reading it back + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) + .project(positionDeleteSchema) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + ImmutableList.of( + GenericRecord.create(positionDeleteSchema) + .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", DELETE_FILE_POS.name(), 0L), + GenericRecord.create(positionDeleteSchema) + .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", DELETE_FILE_POS.name(), 1L)); + + DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected, readRecords); + } +}