From 320ac203d46d7b5c008dda3252f70959211b73c0 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Sun, 15 Feb 2026 16:41:54 +0100 Subject: [PATCH 1/7] Core, Data, Spark: Moving Spark to use the new FormatModel API --- .../iceberg/formats/FormatModelRegistry.java | 3 +- .../actions/RewriteTablePathSparkAction.java | 96 ++++---- .../VectorizedSparkParquetReaders.java | 11 +- .../iceberg/spark/source/BaseBatchReader.java | 102 +++----- .../iceberg/spark/source/BaseRowReader.java | 73 +----- .../spark/source/SparkFileWriterFactory.java | 217 ++++++++++-------- .../spark/source/SparkFormatModels.java | 90 ++++++++ 7 files changed, 297 insertions(+), 295 deletions(-) create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index 7e944510a85e..4a6b5a6cf40f 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -58,7 +58,8 @@ private FormatModelRegistry() {} ImmutableList.of( "org.apache.iceberg.data.GenericFormatModels", "org.apache.iceberg.arrow.vectorized.ArrowFormatModels", - "org.apache.iceberg.flink.data.FlinkFormatModels"); + "org.apache.iceberg.flink.data.FlinkFormatModels", + "org.apache.iceberg.spark.source.SparkFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS = diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index d6a13bcd515d..3c1be6c0e972 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -52,13 +52,14 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileIO; @@ -681,6 +682,13 @@ public CloseableIterable reader( return positionDeletesReader(inputFile, format, spec); } + @Override + public PositionDeleteWriter writer( + OutputFile outputFile, FileFormat format, PartitionSpec spec, StructLike partition) + throws IOException { + return positionDeletesWriter(outputFile, format, spec, partition, null); + } + @Override public PositionDeleteWriter writer( OutputFile outputFile, @@ -720,31 +728,9 @@ private ForeachFunction rewritePositionDelete( private static CloseableIterable positionDeletesReader( InputFile inputFile, FileFormat format, PartitionSpec spec) { Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) - .build(); - - case ORC: - return ORC.read(inputFile) - .project(deleteSchema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) - .build(); - - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); - } + ReadBuilder builder = + FormatModelRegistry.readBuilder(format, Record.class, inputFile); + return builder.project(deleteSchema).reuseContainers().build(); } private static PositionDeleteWriter positionDeletesWriter( @@ -754,30 +740,36 @@ private static PositionDeleteWriter positionDeletesWriter( StructLike partition, Schema rowSchema) throws IOException { - switch (format) { - case AVRO: - return Avro.writeDeletes(outputFile) - .createWriterFunc(DataWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case PARQUET: - return Parquet.writeDeletes(outputFile) - .createWriterFunc(GenericParquetWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case ORC: - return ORC.writeDeletes(outputFile) - .createWriterFunc(GenericOrcWriter::buildWriter) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); + if (rowSchema == null) { + FileWriterBuilder, ?> builder = + FormatModelRegistry.positionDeleteWriteBuilder( + format, EncryptedFiles.plainAsEncryptedOutput(outputFile)); + return builder.partition(partition).spec(spec).build(); + } else { + return switch (format) { + case AVRO -> + Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default -> throw new UnsupportedOperationException("Unsupported file format: " + format); + }; } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 8e25e81a05b2..55f9fc1768a3 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.spark.SparkUtil; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader( return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator()); } - public static CometColumnarBatchReader buildCometReader( + public static VectorizedReader buildCometReader( Schema expectedSchema, MessageType fileSchema, Map idToConstant) { return (CometColumnarBatchReader) TypeWithSchemaVisitor.visit( @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader( readers -> new CometColumnarBatchReader(readers, expectedSchema))); } + /** A subclass of ColumnarBatch to identify Comet readers. */ + public static class CometColumnarBatch extends ColumnarBatch { + public CometColumnarBatch(ColumnVector[] columns) { + super(columns); + } + } + // enables unsafe memory access to avoid costly checks to see if index is within bounds // as long as it is not configured explicitly (see BoundsChecking in Arrow) private static void enableUnsafeMemoryAccess() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index a67496cd7617..0acd8bc24476 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; import java.util.Map; -import java.util.Set; import javax.annotation.Nonnull; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -29,21 +28,18 @@ import org.apache.iceberg.Table; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.OrcBatchReadConf; import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter; import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil; import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector; -import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Pair; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; @@ -74,79 +70,37 @@ protected CloseableIterable newBatchIterable( Expression residual, Map idToConstant, @Nonnull SparkDeleteFilter deleteFilter) { - CloseableIterable iterable; - switch (format) { - case PARQUET: - iterable = - newParquetIterable( - inputFile, start, length, residual, idToConstant, deleteFilter.requiredSchema()); - break; - case ORC: - iterable = newOrcIterable(inputFile, start, length, residual, idToConstant); - break; - default: - throw new UnsupportedOperationException( - "Format: " + format + " not supported for batched reads"); + Class readType = + useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : ColumnarBatch.class; + ReadBuilder readBuilder = + FormatModelRegistry.readBuilder(format, readType, inputFile); + + if (parquetConf != null) { + readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize()); + } else if (orcConf != null) { + readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize()); } - return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); - } + CloseableIterable iterable = + readBuilder + .project(deleteFilter.requiredSchema()) + .idToConstant(idToConstant) + .split(start, length) + .filter(residual) + .caseSensitive(caseSensitive()) + // Spark eagerly consumes the batches. So the underlying memory allocated could be + // reused without worrying about subsequent reads clobbering over each other. This + // improves read performance as every batch read doesn't have to pay the cost of + // allocating memory. + .reuseContainers() + .withNameMapping(nameMapping()) + .build(); - private CloseableIterable newParquetIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant, - Schema requiredSchema) { - return Parquet.read(inputFile) - .project(requiredSchema) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> { - if (parquetConf.readerType() == ParquetReaderType.COMET) { - return VectorizedSparkParquetReaders.buildCometReader( - requiredSchema, fileSchema, idToConstant); - } else { - return VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant); - } - }) - .recordsPerBatch(parquetConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - // Spark eagerly consumes the batches. So the underlying memory allocated could be reused - // without worrying about subsequent reads clobbering over each other. This improves - // read performance as every batch read doesn't have to pay the cost of allocating memory. - .reuseContainers() - .withNameMapping(nameMapping()) - .build(); + return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); } - private CloseableIterable newOrcIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant) { - Set constantFieldIds = idToConstant.keySet(); - Set metadataFieldIds = MetadataColumns.metadataFieldIds(); - Sets.SetView constantAndMetadataFieldIds = - Sets.union(constantFieldIds, metadataFieldIds); - Schema schemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds); - - return ORC.read(inputFile) - .project(schemaWithoutConstantAndMetadataFields) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(orcConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + private boolean useComet() { + return parquetConf != null && parquetConf.readerType() == ParquetReaderType.COMET; } @VisibleForTesting diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 52c32eff0fdb..14febb212aaf 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -20,22 +20,15 @@ import java.util.Map; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.data.SparkOrcReader; -import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.spark.data.SparkPlannedAvroReader; -import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { @@ -56,69 +49,15 @@ protected CloseableIterable newIterable( Expression residual, Schema projection, Map idToConstant) { - switch (format) { - case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); - - case AVRO: - return newAvroIterable(file, start, length, projection, idToConstant); - - case ORC: - return newOrcIterable(file, start, length, residual, projection, idToConstant); - - default: - throw new UnsupportedOperationException("Cannot read unknown format: " + format); - } - } - - private CloseableIterable newAvroIterable( - InputFile file, long start, long length, Schema projection, Map idToConstant) { - return Avro.read(file) - .reuseContainers() + ReadBuilder reader = + FormatModelRegistry.readBuilder(format, InternalRow.class, file); + return reader .project(projection) - .split(start, length) - .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant)) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newParquetIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - return Parquet.read(file) + .idToConstant(idToConstant) .reuseContainers() .split(start, length) - .project(readSchema) - .createReaderFunc( - fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) - .filter(residual) .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newOrcIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - return ORC.read(file) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(start, length) - .createReaderFunc( - readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(residual) - .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) .build(); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index a93db17e4a0f..9b5491b85934 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -23,13 +23,20 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; - -class SparkFileWriterFactory extends BaseFileWriterFactory { - private StructType dataSparkType; - private StructType equalityDeleteSparkType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkFileWriterFactory extends RegistryBasedFileWriterFactory { + private static final Logger LOG = LoggerFactory.getLogger(SparkFileWriterFactory.class); + // We need to use old writers to write position deletes with row data, which is a deprecated + // feature. + private final boolean useDeprecatedPositionDeleteWriter; private StructType positionDeleteSparkType; + private final Schema positionDeleteRowSchema; + private final Table table; + private final FileFormat format; private final Map writeProperties; /** @@ -75,18 +88,26 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = positionDeleteSparkType; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = positionDeleteRowSchema; + this.positionDeleteSparkType = positionDeleteSparkType; + this.useDeprecatedPositionDeleteWriter = + positionDeleteRowSchema != null + || (positionDeleteSparkType != null + && positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined()); } SparkFileWriterFactory( @@ -105,119 +126,105 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - ImmutableMap.of()); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = null; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = null; + this.useDeprecatedPositionDeleteWriter = false; } static Builder builderFor(Table table) { return new Builder(table); } - @Override - protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { - boolean withRow = - positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); - if (withRow) { - // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos - StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME); - StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); - builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); - } - - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(ORC.DataWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - private StructType dataSparkType() { - if (dataSparkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataSparkType = SparkSchemaUtil.convert(dataSchema()); - } - - return dataSparkType; - } - - private StructType equalityDeleteSparkType() { - if (equalityDeleteSparkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteSparkType; - } - private StructType positionDeleteSparkType() { if (positionDeleteSparkType == null) { // wrap the optional row schema into the position delete schema containing path and position - Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); + Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema); this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema); } return positionDeleteSparkType; } + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + if (!useDeprecatedPositionDeleteWriter) { + return super.newPositionDeleteWriter(file, spec, partition); + } else { + LOG.info( + "Deprecated feature used. Position delete row schema is used to create the position delete writer."); + MetricsConfig metricsConfig = + table != null + ? MetricsConfig.forPositionDelete(table) + : MetricsConfig.fromProperties(ImmutableMap.of()); + + try { + return switch (format) { + case AVRO -> { + StructType positionDeleteRowSparkType = + (StructType) positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME).dataType(); + + yield Avro.writeDeletes(file) + .createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .buildPositionWriter(); + } + case ORC -> + ORC.writeDeletes(file) + .createWriterFunc(SparkOrcWriter::new) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(file) + .createWriterFunc( + msgType -> + SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .withPartition(partition) + .overwrite() + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .buildPositionWriter(); + default -> + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); + }; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + } + static class Builder { private final Table table; private FileFormat dataFileFormat; @@ -340,4 +347,14 @@ SparkFileWriterFactory build() { writeProperties); } } + + private static StructType useOrConvert(StructType sparkType, Schema schema) { + if (sparkType != null) { + return sparkType; + } else if (schema != null) { + return SparkSchemaUtil.convert(schema); + } else { + return null; + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java new file mode 100644 index 000000000000..18390971e4d3 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; +import org.apache.iceberg.parquet.ParquetFormatModel; +import org.apache.iceberg.spark.data.SparkAvroWriter; +import org.apache.iceberg.spark.data.SparkOrcReader; +import org.apache.iceberg.spark.data.SparkOrcWriter; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class SparkFormatModels { + public static void register() { + FormatModelRegistry.register( + AvroFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> new SparkAvroWriter(engineSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkPlannedAvroReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> + SparkParquetWriters.buildWriter(engineSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + VectorizedSparkParquetReaders.CometColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildCometReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> + new SparkOrcWriter(icebergSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + new SparkOrcReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkOrcReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + } + + private SparkFormatModels() {} +} From 4633b6fce57217570dc58f7e378aa061edde343e Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 16 Feb 2026 21:17:05 +0100 Subject: [PATCH 2/7] Log message to WARN level --- .../org/apache/iceberg/spark/source/SparkFileWriterFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index 9b5491b85934..35b87631d4cc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -164,7 +164,7 @@ public PositionDeleteWriter newPositionDeleteWriter( if (!useDeprecatedPositionDeleteWriter) { return super.newPositionDeleteWriter(file, spec, partition); } else { - LOG.info( + LOG.warn( "Deprecated feature used. Position delete row schema is used to create the position delete writer."); MetricsConfig metricsConfig = table != null From 6ae7eaa6063c64ce927d1287fdb2b6d8a0c468c8 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Feb 2026 00:03:30 +0100 Subject: [PATCH 3/7] JHM tests --- .../SparkParquetReadersFlatDataBenchmark.java | 32 +++++++++++++++++++ ...parkParquetReadersNestedDataBenchmark.java | 32 +++++++++++++++++++ .../SparkParquetWritersFlatDataBenchmark.java | 23 +++++++++++++ ...parkParquetWritersNestedDataBenchmark.java | 23 +++++++++++++ 4 files changed, 110 insertions(+) diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java index 7f5d70171564..3c7e1e7065f9 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -167,6 +169,21 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { } } + @Benchmark + @Threads(1) + public void readUsingRegistryReader(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) + .project(SCHEMA) + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + @Benchmark @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { @@ -226,4 +243,19 @@ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOExc } } } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingRegistryReader(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } } diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java index e16f18b281d2..9e3a45a32b4e 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -165,6 +167,21 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { } } + @Benchmark + @Threads(1) + public void readUsingRegistryReader(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) + .project(SCHEMA) + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } + @Benchmark @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { @@ -224,4 +241,19 @@ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOExc } } } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingRegistryReader(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } } diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java index 00c361449a0f..733aa8be49be 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -23,8 +23,13 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -121,10 +126,28 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { writer.addAll(rows); } } + + @Benchmark + @Threads(1) + public void writeUsingRegistryWriter() throws IOException { + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) + .schema(SCHEMA) + .engineSchema(SparkSchemaUtil.convert(SCHEMA)) + .spec(PartitionSpec.unpartitioned()) + .build()) { + + writer.write(rows); + } + } } diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java index 24d7fa405148..2d1a3eeeecdc 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -23,8 +23,13 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -121,10 +126,28 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { writer.addAll(rows); } } + + @Benchmark + @Threads(1) + public void writeUsingRegistryWriter() throws IOException { + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) + .schema(SCHEMA) + .engineSchema(SparkSchemaUtil.convert(SCHEMA)) + .spec(PartitionSpec.unpartitioned()) + .build()) { + + writer.write(rows); + } + } } From 576a7c9f35f7ec1258eae01d6c5816b49b73538f Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Feb 2026 10:37:44 +0100 Subject: [PATCH 4/7] Set metrics config and writer configurations in the fallback path --- .../spark/source/SparkFileWriterFactory.java | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index 35b87631d4cc..6ca91f1722b6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -166,39 +166,44 @@ public PositionDeleteWriter newPositionDeleteWriter( } else { LOG.warn( "Deprecated feature used. Position delete row schema is used to create the position delete writer."); + Map properties = table == null ? ImmutableMap.of() : table.properties(); MetricsConfig metricsConfig = - table != null - ? MetricsConfig.forPositionDelete(table) - : MetricsConfig.fromProperties(ImmutableMap.of()); + table == null + ? MetricsConfig.forPositionDelete() + : MetricsConfig.forPositionDelete(table); try { return switch (format) { - case AVRO -> { - StructType positionDeleteRowSparkType = - (StructType) positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME).dataType(); - - yield Avro.writeDeletes(file) - .createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)) - .withPartition(partition) - .overwrite() - .rowSchema(positionDeleteRowSchema) - .withSpec(spec) - .withKeyMetadata(file.keyMetadata()) - .setAll(writeProperties) - .metricsConfig(metricsConfig) - .buildPositionWriter(); - } + case AVRO -> + Avro.writeDeletes(file) + .createWriterFunc( + ignored -> + new SparkAvroWriter( + (StructType) + positionDeleteSparkType() + .apply(DELETE_FILE_ROW_FIELD_NAME) + .dataType())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); case ORC -> ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) .withPartition(partition) .overwrite() .rowSchema(positionDeleteRowSchema) .withSpec(spec) .withKeyMetadata(file.keyMetadata()) - .setAll(writeProperties) - .metricsConfig(metricsConfig) .buildPositionWriter(); case PARQUET -> Parquet.writeDeletes(file) @@ -206,14 +211,15 @@ public PositionDeleteWriter newPositionDeleteWriter( msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)) .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) .withPartition(partition) .overwrite() .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) .withSpec(spec) .withKeyMetadata(file.keyMetadata()) - .setAll(writeProperties) - .metricsConfig(metricsConfig) .buildPositionWriter(); default -> throw new UnsupportedOperationException( From 4419dc30983c0fc269b2216a2a81c33dfaaa2e32 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Feb 2026 10:45:18 +0100 Subject: [PATCH 5/7] Some formatting --- .../actions/RewriteTablePathSparkAction.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 3c1be6c0e972..702dcc4407a8 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -57,9 +57,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.formats.FileWriterBuilder; import org.apache.iceberg.formats.FormatModelRegistry; -import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileIO; @@ -727,10 +725,10 @@ private ForeachFunction rewritePositionDelete( private static CloseableIterable positionDeletesReader( InputFile inputFile, FileFormat format, PartitionSpec spec) { - Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); - ReadBuilder builder = - FormatModelRegistry.readBuilder(format, Record.class, inputFile); - return builder.project(deleteSchema).reuseContainers().build(); + return FormatModelRegistry.readBuilder(format, Record.class, inputFile) + .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema())) + .reuseContainers() + .build(); } private static PositionDeleteWriter positionDeletesWriter( @@ -741,10 +739,11 @@ private static PositionDeleteWriter positionDeletesWriter( Schema rowSchema) throws IOException { if (rowSchema == null) { - FileWriterBuilder, ?> builder = - FormatModelRegistry.positionDeleteWriteBuilder( - format, EncryptedFiles.plainAsEncryptedOutput(outputFile)); - return builder.partition(partition).spec(spec).build(); + return FormatModelRegistry.positionDeleteWriteBuilder( + format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) + .partition(partition) + .spec(spec) + .build(); } else { return switch (format) { case AVRO -> From b1c225b91c174a2220bf04ea8a16396aa73e52e6 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 18 Feb 2026 10:01:49 +0100 Subject: [PATCH 6/7] Ryan's comments --- .../SparkParquetReadersFlatDataBenchmark.java | 36 ++----------------- ...parkParquetReadersNestedDataBenchmark.java | 36 ++----------------- .../SparkParquetWritersFlatDataBenchmark.java | 31 ++++------------ ...parkParquetWritersNestedDataBenchmark.java | 31 ++++------------ .../actions/RewriteTablePathSparkAction.java | 7 ---- .../spark/data/SparkParquetWriters.java | 14 ++++++-- .../spark/source/SparkFileWriterFactory.java | 3 +- .../spark/source/SparkFormatModels.java | 2 +- 8 files changed, 34 insertions(+), 126 deletions(-) diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java index 3c7e1e7065f9..c900a3d53c78 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -115,9 +115,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackHole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -171,26 +171,11 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Benchmark @Threads(1) - public void readUsingRegistryReader(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = FormatModelRegistry.readBuilder( FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) - .project(SCHEMA) - .build()) { - - for (InternalRow row : rows) { - blackHole.consume(row); - } - } - } - - @Benchmark - @Threads(1) - public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { - try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -243,19 +228,4 @@ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOExc } } } - - @Benchmark - @Threads(1) - public void readWithProjectionUsingRegistryReader(Blackhole blackhole) throws IOException { - try (CloseableIterable rows = - FormatModelRegistry.readBuilder( - FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) - .project(PROJECTED_SCHEMA) - .build()) { - - for (InternalRow row : rows) { - blackhole.consume(row); - } - } - } } diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java index 9e3a45a32b4e..32c45b1496ee 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -113,9 +113,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -169,26 +169,11 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Benchmark @Threads(1) - public void readUsingRegistryReader(Blackhole blackhole) throws IOException { + public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = FormatModelRegistry.readBuilder( FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) - .project(SCHEMA) - .build()) { - - for (InternalRow row : rows) { - blackhole.consume(row); - } - } - } - - @Benchmark - @Threads(1) - public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { - try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -241,19 +226,4 @@ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOExc } } } - - @Benchmark - @Threads(1) - public void readWithProjectionUsingRegistryReader(Blackhole blackhole) throws IOException { - try (CloseableIterable rows = - FormatModelRegistry.readBuilder( - FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) - .project(PROJECTED_SCHEMA) - .build()) { - - for (InternalRow row : rows) { - blackhole.consume(row); - } - } - } } diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java index 733aa8be49be..21d5ec147789 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -34,7 +34,6 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -100,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -133,21 +133,4 @@ public void writeUsingSparkWriter() throws IOException { writer.addAll(rows); } } - - @Benchmark - @Threads(1) - public void writeUsingRegistryWriter() throws IOException { - try (DataWriter writer = - FormatModelRegistry.dataWriteBuilder( - FileFormat.PARQUET, - InternalRow.class, - EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) - .schema(SCHEMA) - .engineSchema(SparkSchemaUtil.convert(SCHEMA)) - .spec(PartitionSpec.unpartitioned()) - .build()) { - - writer.write(rows); - } - } } diff --git a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java index 2d1a3eeeecdc..7e9e1ab08551 100644 --- a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java +++ b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -34,7 +34,6 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -100,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -133,21 +133,4 @@ public void writeUsingSparkWriter() throws IOException { writer.addAll(rows); } } - - @Benchmark - @Threads(1) - public void writeUsingRegistryWriter() throws IOException { - try (DataWriter writer = - FormatModelRegistry.dataWriteBuilder( - FileFormat.PARQUET, - InternalRow.class, - EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) - .schema(SCHEMA) - .engineSchema(SparkSchemaUtil.convert(SCHEMA)) - .spec(PartitionSpec.unpartitioned()) - .build()) { - - writer.write(rows); - } - } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 702dcc4407a8..674d238b3a09 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -680,13 +680,6 @@ public CloseableIterable reader( return positionDeletesReader(inputFile, format, spec); } - @Override - public PositionDeleteWriter writer( - OutputFile outputFile, FileFormat format, PartitionSpec spec, StructLike partition) - throws IOException { - return positionDeletesWriter(outputFile, format, spec, partition, null); - } - @Override public PositionDeleteWriter writer( OutputFile outputFile, diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 8bdfe7c3a810..9714ddeaf0a9 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -30,6 +30,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -41,6 +42,7 @@ import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; import org.apache.iceberg.util.UUIDUtil; @@ -75,10 +77,18 @@ public class SparkParquetWriters { private SparkParquetWriters() {} - @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(StructType dfSchema, MessageType type) { + return buildWriter(null, dfSchema, type); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter( + Schema icebergSchema, StructType dfSchema, MessageType type) { return (ParquetValueWriter) - ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type)); + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); } private static class WriteBuilder extends ParquetWithSparkSchemaVisitor> { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index 6ca91f1722b6..2b3bf73d56b3 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -164,8 +164,7 @@ public PositionDeleteWriter newPositionDeleteWriter( if (!useDeprecatedPositionDeleteWriter) { return super.newPositionDeleteWriter(file, spec, partition); } else { - LOG.warn( - "Deprecated feature used. Position delete row schema is used to create the position delete writer."); + LOG.warn("Position deletes with deleted rows are deprecated and will be removed in 1.12.0."); Map properties = table == null ? ImmutableMap.of() : table.properties(); MetricsConfig metricsConfig = table == null diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index 18390971e4d3..fa572c99cb00 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -49,7 +49,7 @@ public static void register() { InternalRow.class, StructType.class, (icebergSchema, fileSchema, engineSchema) -> - SparkParquetWriters.buildWriter(engineSchema, fileSchema), + SparkParquetWriters.buildWriter(icebergSchema, engineSchema, fileSchema), (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); From 025210fff3fb8fa1211f457a9b288141f33d1938 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 18 Feb 2026 14:04:39 +0100 Subject: [PATCH 7/7] Parameter order change --- .../iceberg/spark/data/SparkParquetWriters.java | 13 +++++++++++-- .../iceberg/spark/source/SparkFormatModels.java | 3 +-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 9714ddeaf0a9..3ff5ef9c577d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -78,12 +78,21 @@ public class SparkParquetWriters { private SparkParquetWriters() {} public static ParquetValueWriter buildWriter(StructType dfSchema, MessageType type) { - return buildWriter(null, dfSchema, type); + return buildWriter(null, type, dfSchema); } @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter( - Schema icebergSchema, StructType dfSchema, MessageType type) { + Schema icebergSchema, MessageType type, StructType dfSchema) { + return (ParquetValueWriter) + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); + } + + public static ParquetValueWriter buildWriter( + StructType dfSchema, MessageType type, Schema icebergSchema) { return (ParquetValueWriter) ParquetWithSparkSchemaVisitor.visit( dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java index fa572c99cb00..677f2e950b44 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -48,8 +48,7 @@ public static void register() { ParquetFormatModel.create( InternalRow.class, StructType.class, - (icebergSchema, fileSchema, engineSchema) -> - SparkParquetWriters.buildWriter(icebergSchema, engineSchema, fileSchema), + SparkParquetWriters::buildWriter, (icebergSchema, fileSchema, engineSchema, idToConstant) -> SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));