From 973cd8b17e017c4cb7aeb324519800708b7dfe3e Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Sat, 7 Feb 2026 12:14:58 +0100 Subject: [PATCH 1/3] Core, Data: Implementation of AvroFormatModel --- .../java/org/apache/iceberg/avro/Avro.java | 7 +- .../apache/iceberg/avro/AvroFormatModel.java | 267 ++++++++++++++++++ .../iceberg/formats/FormatModelRegistry.java | 3 +- .../iceberg/data/GenericFormatModels.java | 41 +++ .../iceberg/data/TestGenericFormatModels.java | 217 ++++++++++++++ 5 files changed, 530 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java create mode 100644 data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java create mode 100644 data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 6c7edc25b691..4a5136f58e71 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -182,8 +182,7 @@ public WriteBuilder overwrite(boolean enabled) { } // supposed to always be a private method used strictly by data and delete write builders - private WriteBuilder createContextFunc( - Function, Context> newCreateContextFunc) { + WriteBuilder createContextFunc(Function, Context> newCreateContextFunc) { this.createContextFunc = newCreateContextFunc; return this; } @@ -217,7 +216,7 @@ public FileAppender build() throws IOException { overwrite); } - private static class Context { + static class Context { private final CodecFactory codec; private Context(CodecFactory codec) { @@ -568,7 +567,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { } /** A {@link DatumWriter} implementation that wraps another to produce position deletes. */ - private static class PositionDatumWriter implements MetricsAwareDatumWriter> { + static class PositionDatumWriter implements MetricsAwareDatumWriter> { private static final ValueWriter PATH_WRITER = ValueWriters.strings(); private static final ValueWriter POS_WRITER = ValueWriters.longs(); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java new file mode 100644 index 000000000000..062149d7174c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.BaseFormatModel; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.apache.iceberg.formats.ReadBuilder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class AvroFormatModel + extends BaseFormatModel, DatumReader, Schema> { + + @SuppressWarnings("rawtypes") + public static AvroFormatModel forDelete() { + return new AvroFormatModel<>(PositionDelete.class, null, null, null); + } + + public static AvroFormatModel create( + Class type, + Class schemaType, + WriterFunction, S, Schema> writerFunction, + ReaderFunction, S, Schema> readerFunction) { + return new AvroFormatModel<>(type, schemaType, writerFunction, readerFunction); + } + + private AvroFormatModel( + Class type, + Class schemaType, + WriterFunction, S, Schema> writerFunction, + ReaderFunction, S, Schema> readerFunction) { + super(type, schemaType, writerFunction, readerFunction); + } + + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return new WriteBuilderWrapper<>(outputFile, writerFunction()); + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return new ReadBuilderWrapper<>(inputFile, readerFunction()); + } + + private static class WriteBuilderWrapper implements ModelWriteBuilder { + private final Avro.WriteBuilder internal; + private final WriterFunction, S, Schema> writerFunction; + private org.apache.iceberg.Schema schema; + private S engineSchema; + private FileContent content; + + private WriteBuilderWrapper( + EncryptedOutputFile outputFile, WriterFunction, S, Schema> writerFunction) { + this.internal = Avro.write(outputFile.encryptingOutputFile()); + this.writerFunction = writerFunction; + } + + @Override + public ModelWriteBuilder schema(org.apache.iceberg.Schema newSchema) { + this.schema = newSchema; + internal.schema(newSchema); + return this; + } + + @Override + public ModelWriteBuilder engineSchema(S newSchema) { + this.engineSchema = newSchema; + return this; + } + + @Override + public ModelWriteBuilder set(String property, String value) { + internal.set(property, value); + return this; + } + + @Override + public ModelWriteBuilder setAll(Map properties) { + internal.setAll(properties); + return this; + } + + @Override + public ModelWriteBuilder meta(String property, String value) { + internal.meta(property, value); + return this; + } + + @Override + public ModelWriteBuilder meta(Map properties) { + internal.meta(properties); + return this; + } + + @Override + public ModelWriteBuilder content(FileContent newContent) { + this.content = newContent; + return this; + } + + @Override + public ModelWriteBuilder metricsConfig(MetricsConfig metricsConfig) { + internal.metricsConfig(metricsConfig); + return this; + } + + @Override + public ModelWriteBuilder overwrite() { + internal.overwrite(); + return this; + } + + @Override + public ModelWriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Avro does not support file encryption keys"); + } + + @Override + public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Avro does not support AAD prefix"); + } + + @Override + public org.apache.iceberg.io.FileAppender build() throws IOException { + switch (content) { + case DATA: + internal.createContextFunc(Avro.WriteBuilder.Context::dataContext); + internal.createWriterFunc( + avroSchema -> writerFunction.write(schema, avroSchema, engineSchema)); + break; + case EQUALITY_DELETES: + internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext); + internal.createWriterFunc( + avroSchema -> writerFunction.write(schema, avroSchema, engineSchema)); + break; + case POSITION_DELETES: + internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext); + internal.createWriterFunc(unused -> new Avro.PositionDatumWriter()); + internal.schema(DeleteSchemaUtil.pathPosSchema()); + break; + default: + throw new IllegalArgumentException("Unknown file content: " + content); + } + + return internal.build(); + } + } + + private static class ReadBuilderWrapper implements ReadBuilder { + private final Avro.ReadBuilder internal; + private final ReaderFunction, S, Schema> readerFunction; + private S engineSchema; + private Map idToConstant = ImmutableMap.of(); + + private ReadBuilderWrapper( + InputFile inputFile, ReaderFunction, S, Schema> readerFunction) { + this.internal = Avro.read(inputFile); + this.readerFunction = readerFunction; + } + + @Override + public ReadBuilder split(long newStart, long newLength) { + internal.split(newStart, newLength); + return this; + } + + @Override + public ReadBuilder project(org.apache.iceberg.Schema schema) { + internal.project(schema); + return this; + } + + @Override + public ReadBuilder engineProjection(S schema) { + this.engineSchema = schema; + return this; + } + + @Override + public ReadBuilder caseSensitive(boolean caseSensitive) { + // Filtering is not supported in Avro reader, so case sensitivity does not matter + // This is not an error since filtering is best-effort. + return this; + } + + @Override + public ReadBuilder filter(Expression filter) { + // Filtering is not supported in Avro reader + // This is not an error since filtering is best-effort. + return this; + } + + @Override + public ReadBuilder set(String key, String value) { + // Configuration is not used for Avro reader creation + return this; + } + + @Override + public ReadBuilder reuseContainers() { + internal.reuseContainers(); + return this; + } + + @Override + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + throw new UnsupportedOperationException("Batch reading is not supported in Avro reader"); + } + + @Override + public ReadBuilder idToConstant(Map newIdToConstant) { + this.idToConstant = newIdToConstant; + return this; + } + + @Override + public ReadBuilder withNameMapping(org.apache.iceberg.mapping.NameMapping nameMapping) { + internal.withNameMapping(nameMapping); + return this; + } + + @Override + public CloseableIterable build() { + // The file schema is passed directly to the DatumReader by the Avro read path, so null is + // passed here + return internal + .createResolvingReader( + icebergSchema -> readerFunction.read(icebergSchema, null, engineSchema, idToConstant)) + .build(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index 91301b95dc1a..cdad6362e8c9 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -54,7 +54,8 @@ private FormatModelRegistry() {} private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class); // The list of classes which are used for registering the reader and writer builders - private static final List CLASSES_TO_REGISTER = ImmutableList.of(); + private static final List CLASSES_TO_REGISTER = + ImmutableList.of("org.apache.iceberg.data.GenericFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS = diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java new file mode 100644 index 000000000000..5cadc865302a --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; +import org.apache.iceberg.formats.FormatModelRegistry; + +public class GenericFormatModels { + public static void register() { + FormatModelRegistry.register( + AvroFormatModel.create( + Record.class, + Schema.class, + (icebergSchema, fileSchema, engineSchema) -> DataWriter.create(fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + PlannedDataReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register(AvroFormatModel.forDelete()); + } + + private GenericFormatModels() {} +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java new file mode 100644 index 000000000000..c198680cb85f --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestBase; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestGenericFormatModels { + @Parameters(name = "fileFormat = {0}") + protected static List parameters() { + return List.of(FileFormat.AVRO); + } + + private static final List TEST_RECORDS = + ImmutableList.of( + GenericRecord.create(TestBase.SCHEMA).copy("id", 1, "data", "hello"), + GenericRecord.create(TestBase.SCHEMA).copy("id", 2, "data", "world")); + + @Parameter(index = 0) + private FileFormat fileFormat; + + @TempDir protected Path temp; + + private InMemoryFileIO fileIO; + private EncryptedOutputFile encryptedFile; + + @BeforeEach + public void before() { + this.fileIO = new InMemoryFileIO(); + OutputFile outputFile = fileIO.newOutputFile("test-file." + fileFormat.name().toLowerCase()); + this.encryptedFile = EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY); + } + + @AfterEach + public void after() throws IOException { + fileIO.deleteFile(encryptedFile.encryptingOutputFile()); + this.encryptedFile = null; + if (fileIO != null) { + fileIO.close(); + } + } + + @TestTemplate + public void testDataWriter() throws IOException { + FileWriterBuilder, Schema> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); + + DataFile dataFile; + DataWriter writer = + writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build(); + try (writer) { + for (Record record : TEST_RECORDS) { + writer.write(record); + } + } + + dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(2); + assertThat(dataFile.format()).isEqualTo(fileFormat); + + // Verify the file content by reading it back + InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location()); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) + .project(TestBase.SCHEMA) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(2); + DataTestHelpers.assertEquals( + TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0)); + DataTestHelpers.assertEquals( + TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1)); + } + + @TestTemplate + public void testEqualityDeleteWriter() throws IOException { + FileWriterBuilder, Schema> writerBuilder = + FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile); + + DeleteFile deleteFile; + EqualityDeleteWriter writer = + writerBuilder + .schema(TestBase.SCHEMA) + .spec(PartitionSpec.unpartitioned()) + .equalityFieldIds(3) + .build(); + try (writer) { + for (Record record : TEST_RECORDS) { + writer.write(record); + } + } + + deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(2); + assertThat(deleteFile.format()).isEqualTo(fileFormat); + assertThat(deleteFile.equalityFieldIds()).containsExactly(3); + + // Verify the file content by reading it back + InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location()); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) + .project(TestBase.SCHEMA) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(2); + DataTestHelpers.assertEquals( + TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0)); + DataTestHelpers.assertEquals( + TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1)); + } + + @TestTemplate + public void testPositionDeleteWriter() throws IOException { + Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, DELETE_FILE_POS); + + FileWriterBuilder, ?> writerBuilder = + FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile); + + PositionDelete delete1 = PositionDelete.create(); + delete1.set("data-file-1.parquet", 0L, null); + + PositionDelete delete2 = PositionDelete.create(); + delete2.set("data-file-1.parquet", 1L, null); + + List> positionDeletes = ImmutableList.of(delete1, delete2); + + DeleteFile deleteFile; + PositionDeleteWriter writer = writerBuilder.spec(PartitionSpec.unpartitioned()).build(); + try (writer) { + for (PositionDelete delete : positionDeletes) { + writer.write(delete); + } + } + + deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(2); + assertThat(deleteFile.format()).isEqualTo(fileFormat); + + // Verify the file content by reading it back + InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location()); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) + .project(positionDeleteSchema) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(2); + assertThat(readRecords.get(0).getField("file_path")).isEqualTo("data-file-1.parquet"); + assertThat(readRecords.get(0).getField("pos")).isEqualTo(0L); + assertThat(readRecords.get(1).getField("file_path")).isEqualTo("data-file-1.parquet"); + assertThat(readRecords.get(1).getField("pos")).isEqualTo(1L); + } +} From b986069725b7eb7e6ad4955b3b7c9cc025c8e9e7 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 10 Feb 2026 20:43:35 +0100 Subject: [PATCH 2/3] Ryan's comments --- .../apache/iceberg/avro/AvroFormatModel.java | 18 ++++- .../iceberg/deletes/PositionDelete.java | 5 ++ .../iceberg/formats/FormatModelRegistry.java | 6 +- .../iceberg/data/GenericFormatModels.java | 5 +- .../apache/iceberg/data/DataTestHelpers.java | 8 ++ .../iceberg/data/TestGenericFormatModels.java | 80 ++++++++----------- 6 files changed, 64 insertions(+), 58 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java index 062149d7174c..e0358c369c6b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java @@ -35,15 +35,16 @@ import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; public class AvroFormatModel extends BaseFormatModel, DatumReader, Schema> { - @SuppressWarnings("rawtypes") - public static AvroFormatModel forDelete() { - return new AvroFormatModel<>(PositionDelete.class, null, null, null); + public static AvroFormatModel, Object> forPositionDeletes() { + return new AvroFormatModel<>(PositionDelete.deleteClass(), null, null, null); } public static AvroFormatModel create( @@ -156,7 +157,7 @@ public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { } @Override - public org.apache.iceberg.io.FileAppender build() throws IOException { + public FileAppender build() throws IOException { switch (content) { case DATA: internal.createContextFunc(Avro.WriteBuilder.Context::dataContext); @@ -169,6 +170,15 @@ public org.apache.iceberg.io.FileAppender build() throws IOException { avroSchema -> writerFunction.write(schema, avroSchema, engineSchema)); break; case POSITION_DELETES: + Preconditions.checkState( + schema == null, + "Invalid schema: %s. Position deletes with schema are not supported by the API.", + schema); + Preconditions.checkState( + engineSchema == null, + "Invalid engineSchema: %s. Position deletes with schema are not supported by the API.", + engineSchema); + internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext); internal.createWriterFunc(unused -> new Avro.PositionDatumWriter()); internal.schema(DeleteSchemaUtil.pathPosSchema()); diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java index b202e7fcf34e..c3b6cbaa9bff 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java @@ -31,6 +31,11 @@ public static PositionDelete create() { private PositionDelete() {} + @SuppressWarnings("unchecked") + public static Class> deleteClass() { + return (Class>) (Class) PositionDelete.class; + } + public PositionDelete set(CharSequence newPath, long newPos) { this.path = newPath; this.pos = newPos; diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index cdad6362e8c9..b9adafdbc2c9 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -170,12 +170,10 @@ public static FileWriterBuilder, S> equalityDelet * @param outputFile destination for the written data * @return a configured delete write builder for creating a {@link PositionDeleteWriter} */ - @SuppressWarnings("unchecked") public static FileWriterBuilder, ?> positionDeleteWriteBuilder( FileFormat format, EncryptedOutputFile outputFile) { - Class> deleteClass = - (Class>) (Class) PositionDelete.class; - FormatModel, ?> model = FormatModelRegistry.modelFor(format, deleteClass); + FormatModel, ?> model = + FormatModelRegistry.modelFor(format, PositionDelete.deleteClass()); return FileWriterBuilderImpl.forPositionDelete(model, outputFile); } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java index 5cadc865302a..e2e13861ca2d 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.data; -import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroFormatModel; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.avro.PlannedDataReader; @@ -29,12 +28,12 @@ public static void register() { FormatModelRegistry.register( AvroFormatModel.create( Record.class, - Schema.class, + Void.class, (icebergSchema, fileSchema, engineSchema) -> DataWriter.create(fileSchema), (icebergSchema, fileSchema, engineSchema, idToConstant) -> PlannedDataReader.create(icebergSchema, idToConstant))); - FormatModelRegistry.register(AvroFormatModel.forDelete()); + FormatModelRegistry.register(AvroFormatModel.forPositionDeletes()); } private GenericFormatModels() {} diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index fc8d47680b0f..62fb1ab15f4c 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -31,6 +31,14 @@ public class DataTestHelpers { private DataTestHelpers() {} + public static void assertEquals( + Types.StructType struct, List expected, List actual) { + assertThat(actual).hasSize(expected.size()); + for (int i = 0; i < expected.size(); i += 1) { + assertEquals(struct, expected.get(i), actual.get(i), null, -1); + } + } + public static void assertEquals(Types.StructType struct, Record expected, Record actual) { assertEquals(struct, expected, actual, null, -1); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java index c198680cb85f..197605f5a043 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java @@ -28,9 +28,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Parameter; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TestBase; @@ -46,28 +43,18 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; -@ExtendWith(ParameterizedTestExtension.class) public class TestGenericFormatModels { - @Parameters(name = "fileFormat = {0}") - protected static List parameters() { - return List.of(FileFormat.AVRO); - } - private static final List TEST_RECORDS = - ImmutableList.of( - GenericRecord.create(TestBase.SCHEMA).copy("id", 1, "data", "hello"), - GenericRecord.create(TestBase.SCHEMA).copy("id", 2, "data", "world")); + RandomGenericData.generate(TestBase.SCHEMA, 10, 1L); - @Parameter(index = 0) - private FileFormat fileFormat; + private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.AVRO}; @TempDir protected Path temp; @@ -77,8 +64,9 @@ protected static List parameters() { @BeforeEach public void before() { this.fileIO = new InMemoryFileIO(); - OutputFile outputFile = fileIO.newOutputFile("test-file." + fileFormat.name().toLowerCase()); - this.encryptedFile = EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY); + this.encryptedFile = + EncryptedFiles.encryptedOutput( + fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY); } @AfterEach @@ -90,8 +78,9 @@ public void after() throws IOException { } } - @TestTemplate - public void testDataWriter() throws IOException { + @ParameterizedTest + @FieldSource("FILE_FORMATS") + public void testDataWriter(FileFormat fileFormat) throws IOException { FileWriterBuilder, Schema> writerBuilder = FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); @@ -107,11 +96,11 @@ public void testDataWriter() throws IOException { dataFile = writer.toDataFile(); assertThat(dataFile).isNotNull(); - assertThat(dataFile.recordCount()).isEqualTo(2); + assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size()); assertThat(dataFile.format()).isEqualTo(fileFormat); // Verify the file content by reading it back - InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location()); + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); List readRecords; try (CloseableIterable reader = FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) @@ -120,15 +109,12 @@ public void testDataWriter() throws IOException { readRecords = ImmutableList.copyOf(reader); } - assertThat(readRecords).hasSize(2); - DataTestHelpers.assertEquals( - TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0)); - DataTestHelpers.assertEquals( - TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1)); + DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords); } - @TestTemplate - public void testEqualityDeleteWriter() throws IOException { + @ParameterizedTest + @FieldSource("FILE_FORMATS") + public void testEqualityDeleteWriter(FileFormat fileFormat) throws IOException { FileWriterBuilder, Schema> writerBuilder = FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile); @@ -148,12 +134,12 @@ public void testEqualityDeleteWriter() throws IOException { deleteFile = writer.toDeleteFile(); assertThat(deleteFile).isNotNull(); - assertThat(deleteFile.recordCount()).isEqualTo(2); + assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size()); assertThat(deleteFile.format()).isEqualTo(fileFormat); assertThat(deleteFile.equalityFieldIds()).containsExactly(3); // Verify the file content by reading it back - InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location()); + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); List readRecords; try (CloseableIterable reader = FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) @@ -162,25 +148,22 @@ public void testEqualityDeleteWriter() throws IOException { readRecords = ImmutableList.copyOf(reader); } - assertThat(readRecords).hasSize(2); - DataTestHelpers.assertEquals( - TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0)); - DataTestHelpers.assertEquals( - TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1)); + DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords); } - @TestTemplate - public void testPositionDeleteWriter() throws IOException { + @ParameterizedTest + @FieldSource("FILE_FORMATS") + public void testPositionDeleteWriter(FileFormat fileFormat) throws IOException { Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, DELETE_FILE_POS); FileWriterBuilder, ?> writerBuilder = FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile); PositionDelete delete1 = PositionDelete.create(); - delete1.set("data-file-1.parquet", 0L, null); + delete1.set("data-file-1.parquet", 0L); PositionDelete delete2 = PositionDelete.create(); - delete2.set("data-file-1.parquet", 1L, null); + delete2.set("data-file-1.parquet", 1L); List> positionDeletes = ImmutableList.of(delete1, delete2); @@ -199,7 +182,7 @@ public void testPositionDeleteWriter() throws IOException { assertThat(deleteFile.format()).isEqualTo(fileFormat); // Verify the file content by reading it back - InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location()); + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); List readRecords; try (CloseableIterable reader = FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) @@ -208,10 +191,13 @@ public void testPositionDeleteWriter() throws IOException { readRecords = ImmutableList.copyOf(reader); } - assertThat(readRecords).hasSize(2); - assertThat(readRecords.get(0).getField("file_path")).isEqualTo("data-file-1.parquet"); - assertThat(readRecords.get(0).getField("pos")).isEqualTo(0L); - assertThat(readRecords.get(1).getField("file_path")).isEqualTo("data-file-1.parquet"); - assertThat(readRecords.get(1).getField("pos")).isEqualTo(1L); + List expected = + ImmutableList.of( + GenericRecord.create(positionDeleteSchema) + .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", DELETE_FILE_POS.name(), 0L), + GenericRecord.create(positionDeleteSchema) + .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", DELETE_FILE_POS.name(), 1L)); + + DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected, readRecords); } } From eb772263cce751884a6927c772383051460cfbb9 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Fri, 13 Feb 2026 08:38:58 +0100 Subject: [PATCH 3/3] Ryan's final comments --- .../java/org/apache/iceberg/avro/AvroFormatModel.java | 4 ++-- .../java/org/apache/iceberg/data/DataTestHelpers.java | 2 +- .../org/apache/iceberg/data/TestGenericFormatModels.java | 9 +++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java index e0358c369c6b..e0fcf8952604 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java @@ -43,8 +43,8 @@ public class AvroFormatModel extends BaseFormatModel, DatumReader, Schema> { - public static AvroFormatModel, Object> forPositionDeletes() { - return new AvroFormatModel<>(PositionDelete.deleteClass(), null, null, null); + public static AvroFormatModel, Void> forPositionDeletes() { + return new AvroFormatModel<>(PositionDelete.deleteClass(), Void.class, null, null); } public static AvroFormatModel create( diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index 62fb1ab15f4c..f2e2b4e7fa34 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -35,7 +35,7 @@ public static void assertEquals( Types.StructType struct, List expected, List actual) { assertThat(actual).hasSize(expected.size()); for (int i = 0; i < expected.size(); i += 1) { - assertEquals(struct, expected.get(i), actual.get(i), null, -1); + assertEquals(struct, expected.get(i), actual.get(i)); } } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java index 197605f5a043..ca3dda30ab5f 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java @@ -80,7 +80,7 @@ public void after() throws IOException { @ParameterizedTest @FieldSource("FILE_FORMATS") - public void testDataWriter(FileFormat fileFormat) throws IOException { + public void testDataWriterRoundTrip(FileFormat fileFormat) throws IOException { FileWriterBuilder, Schema> writerBuilder = FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); @@ -105,8 +105,9 @@ public void testDataWriter(FileFormat fileFormat) throws IOException { try (CloseableIterable reader = FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) .project(TestBase.SCHEMA) + .reuseContainers() .build()) { - readRecords = ImmutableList.copyOf(reader); + readRecords = ImmutableList.copyOf(CloseableIterable.transform(reader, Record::copy)); } DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords); @@ -114,7 +115,7 @@ public void testDataWriter(FileFormat fileFormat) throws IOException { @ParameterizedTest @FieldSource("FILE_FORMATS") - public void testEqualityDeleteWriter(FileFormat fileFormat) throws IOException { + public void testEqualityDeleteWriterRoundTrip(FileFormat fileFormat) throws IOException { FileWriterBuilder, Schema> writerBuilder = FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile); @@ -153,7 +154,7 @@ public void testEqualityDeleteWriter(FileFormat fileFormat) throws IOException { @ParameterizedTest @FieldSource("FILE_FORMATS") - public void testPositionDeleteWriter(FileFormat fileFormat) throws IOException { + public void testPositionDeleteWriterRoundTrip(FileFormat fileFormat) throws IOException { Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, DELETE_FILE_POS); FileWriterBuilder, ?> writerBuilder =