From aa4a3deb01945fc363d11315818f6adaec96ca5d Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 18 Feb 2026 16:22:01 +0100 Subject: [PATCH] Spark: Backport moving Spark to use the new FormatModel API --- .../SparkParquetReadersFlatDataBenchmark.java | 10 +- ...parkParquetReadersNestedDataBenchmark.java | 10 +- .../SparkParquetWritersFlatDataBenchmark.java | 20 +- ...parkParquetWritersNestedDataBenchmark.java | 20 +- .../actions/RewriteTablePathSparkAction.java | 90 +++---- .../spark/data/SparkParquetWriters.java | 23 +- .../VectorizedSparkParquetReaders.java | 11 +- .../iceberg/spark/source/BaseBatchReader.java | 102 +++----- .../iceberg/spark/source/BaseRowReader.java | 73 +----- .../spark/source/SparkFileWriterFactory.java | 222 ++++++++++-------- .../spark/source/SparkFormatModels.java | 89 +++++++ .../SparkParquetReadersFlatDataBenchmark.java | 10 +- ...parkParquetReadersNestedDataBenchmark.java | 10 +- .../SparkParquetWritersFlatDataBenchmark.java | 20 +- ...parkParquetWritersNestedDataBenchmark.java | 20 +- .../actions/RewriteTablePathSparkAction.java | 90 +++---- .../spark/data/SparkParquetWriters.java | 23 +- .../VectorizedSparkParquetReaders.java | 11 +- .../iceberg/spark/source/BaseBatchReader.java | 102 +++----- .../iceberg/spark/source/BaseRowReader.java | 73 +----- .../spark/source/SparkFileWriterFactory.java | 222 ++++++++++-------- .../spark/source/SparkFormatModels.java | 89 +++++++ .../SparkParquetReadersFlatDataBenchmark.java | 10 +- ...parkParquetReadersNestedDataBenchmark.java | 10 +- .../SparkParquetWritersFlatDataBenchmark.java | 20 +- ...parkParquetWritersNestedDataBenchmark.java | 20 +- .../actions/RewriteTablePathSparkAction.java | 90 +++---- .../spark/data/SparkParquetWriters.java | 23 +- .../VectorizedSparkParquetReaders.java | 11 +- .../iceberg/spark/source/BaseBatchReader.java | 102 +++----- .../iceberg/spark/source/BaseRowReader.java | 73 +----- .../spark/source/SparkFileWriterFactory.java | 222 ++++++++++-------- .../spark/source/SparkFormatModels.java | 89 +++++++ 33 files changed, 1053 insertions(+), 957 deletions(-) create mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java create mode 100644 spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java diff --git a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java index e65e2fb576b0..da520a84a649 100644 --- a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java +++ b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -113,9 +115,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackHole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -171,9 +173,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { diff --git a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java index 3520c9b47489..5c2902b1951b 100644 --- a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java +++ b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -111,9 +113,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -169,9 +171,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { diff --git a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java index f104b8b88b36..25a08e7597a6 100644 --- a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java +++ b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -23,13 +23,17 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -95,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { diff --git a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java index e375d1c56a6f..49645ff31fb4 100644 --- a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java +++ b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -23,13 +23,17 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -95,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index d6a13bcd515d..674d238b3a09 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -52,13 +52,12 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileIO; @@ -719,32 +718,10 @@ private ForeachFunction rewritePositionDelete( private static CloseableIterable positionDeletesReader( InputFile inputFile, FileFormat format, PartitionSpec spec) { - Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) - .build(); - - case ORC: - return ORC.read(inputFile) - .project(deleteSchema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) - .build(); - - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); - } + return FormatModelRegistry.readBuilder(format, Record.class, inputFile) + .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema())) + .reuseContainers() + .build(); } private static PositionDeleteWriter positionDeletesWriter( @@ -754,30 +731,37 @@ private static PositionDeleteWriter positionDeletesWriter( StructLike partition, Schema rowSchema) throws IOException { - switch (format) { - case AVRO: - return Avro.writeDeletes(outputFile) - .createWriterFunc(DataWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case PARQUET: - return Parquet.writeDeletes(outputFile) - .createWriterFunc(GenericParquetWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case ORC: - return ORC.writeDeletes(outputFile) - .createWriterFunc(GenericOrcWriter::buildWriter) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); + if (rowSchema == null) { + return FormatModelRegistry.positionDeleteWriteBuilder( + format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) + .partition(partition) + .spec(spec) + .build(); + } else { + return switch (format) { + case AVRO -> + Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default -> throw new UnsupportedOperationException("Unsupported file format: " + format); + }; } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index ffda57be2bc3..6a99912e1eb9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -26,6 +26,7 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.UUID; +import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -34,6 +35,7 @@ import org.apache.iceberg.parquet.ParquetValueWriters.RepeatedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; import org.apache.iceberg.util.UUIDUtil; @@ -61,10 +63,27 @@ public class SparkParquetWriters { private SparkParquetWriters() {} - @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(StructType dfSchema, MessageType type) { + return buildWriter(null, type, dfSchema); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter( + Schema icebergSchema, MessageType type, StructType dfSchema) { + return (ParquetValueWriter) + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); + } + + public static ParquetValueWriter buildWriter( + StructType dfSchema, MessageType type, Schema icebergSchema) { return (ParquetValueWriter) - ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type)); + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); } private static class WriteBuilder extends ParquetWithSparkSchemaVisitor> { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 8e25e81a05b2..55f9fc1768a3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.spark.SparkUtil; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader( return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator()); } - public static CometColumnarBatchReader buildCometReader( + public static VectorizedReader buildCometReader( Schema expectedSchema, MessageType fileSchema, Map idToConstant) { return (CometColumnarBatchReader) TypeWithSchemaVisitor.visit( @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader( readers -> new CometColumnarBatchReader(readers, expectedSchema))); } + /** A subclass of ColumnarBatch to identify Comet readers. */ + public static class CometColumnarBatch extends ColumnarBatch { + public CometColumnarBatch(ColumnVector[] columns) { + super(columns); + } + } + // enables unsafe memory access to avoid costly checks to see if index is within bounds // as long as it is not configured explicitly (see BoundsChecking in Arrow) private static void enableUnsafeMemoryAccess() { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..89c03a4c2b72 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; import java.util.Map; -import java.util.Set; import javax.annotation.Nonnull; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -29,21 +28,18 @@ import org.apache.iceberg.Table; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.OrcBatchReadConf; import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter; import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil; import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector; -import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Pair; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; @@ -76,79 +72,37 @@ protected CloseableIterable newBatchIterable( Expression residual, Map idToConstant, @Nonnull SparkDeleteFilter deleteFilter) { - CloseableIterable iterable; - switch (format) { - case PARQUET: - iterable = - newParquetIterable( - inputFile, start, length, residual, idToConstant, deleteFilter.requiredSchema()); - break; - case ORC: - iterable = newOrcIterable(inputFile, start, length, residual, idToConstant); - break; - default: - throw new UnsupportedOperationException( - "Format: " + format + " not supported for batched reads"); + Class readType = + useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : ColumnarBatch.class; + ReadBuilder readBuilder = + FormatModelRegistry.readBuilder(format, readType, inputFile); + + if (parquetConf != null) { + readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize()); + } else if (orcConf != null) { + readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize()); } - return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); - } + CloseableIterable iterable = + readBuilder + .project(deleteFilter.requiredSchema()) + .idToConstant(idToConstant) + .split(start, length) + .filter(residual) + .caseSensitive(caseSensitive()) + // Spark eagerly consumes the batches. So the underlying memory allocated could be + // reused without worrying about subsequent reads clobbering over each other. This + // improves read performance as every batch read doesn't have to pay the cost of + // allocating memory. + .reuseContainers() + .withNameMapping(nameMapping()) + .build(); - private CloseableIterable newParquetIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant, - Schema requiredSchema) { - return Parquet.read(inputFile) - .project(requiredSchema) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> { - if (parquetConf.readerType() == ParquetReaderType.COMET) { - return VectorizedSparkParquetReaders.buildCometReader( - requiredSchema, fileSchema, idToConstant); - } else { - return VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant); - } - }) - .recordsPerBatch(parquetConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - // Spark eagerly consumes the batches. So the underlying memory allocated could be reused - // without worrying about subsequent reads clobbering over each other. This improves - // read performance as every batch read doesn't have to pay the cost of allocating memory. - .reuseContainers() - .withNameMapping(nameMapping()) - .build(); + return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); } - private CloseableIterable newOrcIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant) { - Set constantFieldIds = idToConstant.keySet(); - Set metadataFieldIds = MetadataColumns.metadataFieldIds(); - Sets.SetView constantAndMetadataFieldIds = - Sets.union(constantFieldIds, metadataFieldIds); - Schema schemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds); - - return ORC.read(inputFile) - .project(schemaWithoutConstantAndMetadataFields) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(orcConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + private boolean useComet() { + return parquetConf != null && parquetConf.readerType() == ParquetReaderType.COMET; } @VisibleForTesting diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index c12931e786b1..53d44e760afe 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -20,22 +20,15 @@ import java.util.Map; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.data.SparkOrcReader; -import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.spark.data.SparkPlannedAvroReader; -import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { @@ -58,69 +51,15 @@ protected CloseableIterable newIterable( Expression residual, Schema projection, Map idToConstant) { - switch (format) { - case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); - - case AVRO: - return newAvroIterable(file, start, length, projection, idToConstant); - - case ORC: - return newOrcIterable(file, start, length, residual, projection, idToConstant); - - default: - throw new UnsupportedOperationException("Cannot read unknown format: " + format); - } - } - - private CloseableIterable newAvroIterable( - InputFile file, long start, long length, Schema projection, Map idToConstant) { - return Avro.read(file) - .reuseContainers() + ReadBuilder reader = + FormatModelRegistry.readBuilder(format, InternalRow.class, file); + return reader .project(projection) - .split(start, length) - .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant)) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newParquetIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - return Parquet.read(file) + .idToConstant(idToConstant) .reuseContainers() .split(start, length) - .project(readSchema) - .createReaderFunc( - fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) - .filter(residual) .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newOrcIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - return ORC.read(file) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(start, length) - .createReaderFunc( - readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(residual) - .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index a93db17e4a0f..2b3bf73d56b3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -23,13 +23,20 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; - -class SparkFileWriterFactory extends BaseFileWriterFactory { - private StructType dataSparkType; - private StructType equalityDeleteSparkType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkFileWriterFactory extends RegistryBasedFileWriterFactory { + private static final Logger LOG = LoggerFactory.getLogger(SparkFileWriterFactory.class); + // We need to use old writers to write position deletes with row data, which is a deprecated + // feature. + private final boolean useDeprecatedPositionDeleteWriter; private StructType positionDeleteSparkType; + private final Schema positionDeleteRowSchema; + private final Table table; + private final FileFormat format; private final Map writeProperties; /** @@ -75,18 +88,26 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = positionDeleteSparkType; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = positionDeleteRowSchema; + this.positionDeleteSparkType = positionDeleteSparkType; + this.useDeprecatedPositionDeleteWriter = + positionDeleteRowSchema != null + || (positionDeleteSparkType != null + && positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined()); } SparkFileWriterFactory( @@ -105,119 +126,110 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - ImmutableMap.of()); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = null; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = null; + this.useDeprecatedPositionDeleteWriter = false; } static Builder builderFor(Table table) { return new Builder(table); } - @Override - protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { - boolean withRow = - positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); - if (withRow) { - // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos - StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME); - StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); - builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); - } - - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(ORC.DataWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - private StructType dataSparkType() { - if (dataSparkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataSparkType = SparkSchemaUtil.convert(dataSchema()); - } - - return dataSparkType; - } - - private StructType equalityDeleteSparkType() { - if (equalityDeleteSparkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteSparkType; - } - private StructType positionDeleteSparkType() { if (positionDeleteSparkType == null) { // wrap the optional row schema into the position delete schema containing path and position - Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); + Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema); this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema); } return positionDeleteSparkType; } + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + if (!useDeprecatedPositionDeleteWriter) { + return super.newPositionDeleteWriter(file, spec, partition); + } else { + LOG.warn("Position deletes with deleted rows are deprecated and will be removed in 1.12.0."); + Map properties = table == null ? ImmutableMap.of() : table.properties(); + MetricsConfig metricsConfig = + table == null + ? MetricsConfig.forPositionDelete() + : MetricsConfig.forPositionDelete(table); + + try { + return switch (format) { + case AVRO -> + Avro.writeDeletes(file) + .createWriterFunc( + ignored -> + new SparkAvroWriter( + (StructType) + positionDeleteSparkType() + .apply(DELETE_FILE_ROW_FIELD_NAME) + .dataType())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(file) + .createWriterFunc(SparkOrcWriter::new) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(file) + .createWriterFunc( + msgType -> + SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + default -> + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); + }; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + } + static class Builder { private final Table table; private FileFormat dataFileFormat; @@ -340,4 +352,14 @@ SparkFileWriterFactory build() { writeProperties); } } + + private static StructType useOrConvert(StructType sparkType, Schema schema) { + if (sparkType != null) { + return sparkType; + } else if (schema != null) { + return SparkSchemaUtil.convert(schema); + } else { + return null; + } + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java new file mode 100644 index 000000000000..677f2e950b44 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; +import org.apache.iceberg.parquet.ParquetFormatModel; +import org.apache.iceberg.spark.data.SparkAvroWriter; +import org.apache.iceberg.spark.data.SparkOrcReader; +import org.apache.iceberg.spark.data.SparkOrcWriter; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class SparkFormatModels { + public static void register() { + FormatModelRegistry.register( + AvroFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> new SparkAvroWriter(engineSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkPlannedAvroReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + InternalRow.class, + StructType.class, + SparkParquetWriters::buildWriter, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + VectorizedSparkParquetReaders.CometColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildCometReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> + new SparkOrcWriter(icebergSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + new SparkOrcReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkOrcReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + } + + private SparkFormatModels() {} +} diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java index f2f9488b6ec2..1c7505eab471 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -113,9 +115,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackHole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -171,9 +173,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java index b19ab683d634..42454c10ac66 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -111,9 +113,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -169,9 +171,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java index 87289b3cba31..f77454c4b2a3 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -23,13 +23,17 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -95,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java index dd913a27561b..a732d36526c0 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -23,13 +23,17 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -95,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index d6a13bcd515d..674d238b3a09 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -52,13 +52,12 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileIO; @@ -719,32 +718,10 @@ private ForeachFunction rewritePositionDelete( private static CloseableIterable positionDeletesReader( InputFile inputFile, FileFormat format, PartitionSpec spec) { - Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) - .build(); - - case ORC: - return ORC.read(inputFile) - .project(deleteSchema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) - .build(); - - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); - } + return FormatModelRegistry.readBuilder(format, Record.class, inputFile) + .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema())) + .reuseContainers() + .build(); } private static PositionDeleteWriter positionDeletesWriter( @@ -754,30 +731,37 @@ private static PositionDeleteWriter positionDeletesWriter( StructLike partition, Schema rowSchema) throws IOException { - switch (format) { - case AVRO: - return Avro.writeDeletes(outputFile) - .createWriterFunc(DataWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case PARQUET: - return Parquet.writeDeletes(outputFile) - .createWriterFunc(GenericParquetWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case ORC: - return ORC.writeDeletes(outputFile) - .createWriterFunc(GenericOrcWriter::buildWriter) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); + if (rowSchema == null) { + return FormatModelRegistry.positionDeleteWriteBuilder( + format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) + .partition(partition) + .spec(spec) + .build(); + } else { + return switch (format) { + case AVRO -> + Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default -> throw new UnsupportedOperationException("Unsupported file format: " + format); + }; } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 8ffe26dc33f6..dda634a46fda 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -26,6 +26,7 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.UUID; +import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -34,6 +35,7 @@ import org.apache.iceberg.parquet.ParquetValueWriters.RepeatedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; import org.apache.iceberg.util.UUIDUtil; @@ -61,10 +63,27 @@ public class SparkParquetWriters { private SparkParquetWriters() {} - @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(StructType dfSchema, MessageType type) { + return buildWriter(null, type, dfSchema); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter( + Schema icebergSchema, MessageType type, StructType dfSchema) { + return (ParquetValueWriter) + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); + } + + public static ParquetValueWriter buildWriter( + StructType dfSchema, MessageType type, Schema icebergSchema) { return (ParquetValueWriter) - ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type)); + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); } private static class WriteBuilder extends ParquetWithSparkSchemaVisitor> { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 8e25e81a05b2..55f9fc1768a3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.spark.SparkUtil; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader( return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator()); } - public static CometColumnarBatchReader buildCometReader( + public static VectorizedReader buildCometReader( Schema expectedSchema, MessageType fileSchema, Map idToConstant) { return (CometColumnarBatchReader) TypeWithSchemaVisitor.visit( @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader( readers -> new CometColumnarBatchReader(readers, expectedSchema))); } + /** A subclass of ColumnarBatch to identify Comet readers. */ + public static class CometColumnarBatch extends ColumnarBatch { + public CometColumnarBatch(ColumnVector[] columns) { + super(columns); + } + } + // enables unsafe memory access to avoid costly checks to see if index is within bounds // as long as it is not configured explicitly (see BoundsChecking in Arrow) private static void enableUnsafeMemoryAccess() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..89c03a4c2b72 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; import java.util.Map; -import java.util.Set; import javax.annotation.Nonnull; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -29,21 +28,18 @@ import org.apache.iceberg.Table; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.OrcBatchReadConf; import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter; import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil; import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector; -import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Pair; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; @@ -76,79 +72,37 @@ protected CloseableIterable newBatchIterable( Expression residual, Map idToConstant, @Nonnull SparkDeleteFilter deleteFilter) { - CloseableIterable iterable; - switch (format) { - case PARQUET: - iterable = - newParquetIterable( - inputFile, start, length, residual, idToConstant, deleteFilter.requiredSchema()); - break; - case ORC: - iterable = newOrcIterable(inputFile, start, length, residual, idToConstant); - break; - default: - throw new UnsupportedOperationException( - "Format: " + format + " not supported for batched reads"); + Class readType = + useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : ColumnarBatch.class; + ReadBuilder readBuilder = + FormatModelRegistry.readBuilder(format, readType, inputFile); + + if (parquetConf != null) { + readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize()); + } else if (orcConf != null) { + readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize()); } - return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); - } + CloseableIterable iterable = + readBuilder + .project(deleteFilter.requiredSchema()) + .idToConstant(idToConstant) + .split(start, length) + .filter(residual) + .caseSensitive(caseSensitive()) + // Spark eagerly consumes the batches. So the underlying memory allocated could be + // reused without worrying about subsequent reads clobbering over each other. This + // improves read performance as every batch read doesn't have to pay the cost of + // allocating memory. + .reuseContainers() + .withNameMapping(nameMapping()) + .build(); - private CloseableIterable newParquetIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant, - Schema requiredSchema) { - return Parquet.read(inputFile) - .project(requiredSchema) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> { - if (parquetConf.readerType() == ParquetReaderType.COMET) { - return VectorizedSparkParquetReaders.buildCometReader( - requiredSchema, fileSchema, idToConstant); - } else { - return VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant); - } - }) - .recordsPerBatch(parquetConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - // Spark eagerly consumes the batches. So the underlying memory allocated could be reused - // without worrying about subsequent reads clobbering over each other. This improves - // read performance as every batch read doesn't have to pay the cost of allocating memory. - .reuseContainers() - .withNameMapping(nameMapping()) - .build(); + return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); } - private CloseableIterable newOrcIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant) { - Set constantFieldIds = idToConstant.keySet(); - Set metadataFieldIds = MetadataColumns.metadataFieldIds(); - Sets.SetView constantAndMetadataFieldIds = - Sets.union(constantFieldIds, metadataFieldIds); - Schema schemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds); - - return ORC.read(inputFile) - .project(schemaWithoutConstantAndMetadataFields) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(orcConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + private boolean useComet() { + return parquetConf != null && parquetConf.readerType() == ParquetReaderType.COMET; } @VisibleForTesting diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index c12931e786b1..53d44e760afe 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -20,22 +20,15 @@ import java.util.Map; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.data.SparkOrcReader; -import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.spark.data.SparkPlannedAvroReader; -import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { @@ -58,69 +51,15 @@ protected CloseableIterable newIterable( Expression residual, Schema projection, Map idToConstant) { - switch (format) { - case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); - - case AVRO: - return newAvroIterable(file, start, length, projection, idToConstant); - - case ORC: - return newOrcIterable(file, start, length, residual, projection, idToConstant); - - default: - throw new UnsupportedOperationException("Cannot read unknown format: " + format); - } - } - - private CloseableIterable newAvroIterable( - InputFile file, long start, long length, Schema projection, Map idToConstant) { - return Avro.read(file) - .reuseContainers() + ReadBuilder reader = + FormatModelRegistry.readBuilder(format, InternalRow.class, file); + return reader .project(projection) - .split(start, length) - .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant)) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newParquetIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - return Parquet.read(file) + .idToConstant(idToConstant) .reuseContainers() .split(start, length) - .project(readSchema) - .createReaderFunc( - fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) - .filter(residual) .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newOrcIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - return ORC.read(file) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(start, length) - .createReaderFunc( - readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(residual) - .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) .build(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index a93db17e4a0f..2b3bf73d56b3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -23,13 +23,20 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; - -class SparkFileWriterFactory extends BaseFileWriterFactory { - private StructType dataSparkType; - private StructType equalityDeleteSparkType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkFileWriterFactory extends RegistryBasedFileWriterFactory { + private static final Logger LOG = LoggerFactory.getLogger(SparkFileWriterFactory.class); + // We need to use old writers to write position deletes with row data, which is a deprecated + // feature. + private final boolean useDeprecatedPositionDeleteWriter; private StructType positionDeleteSparkType; + private final Schema positionDeleteRowSchema; + private final Table table; + private final FileFormat format; private final Map writeProperties; /** @@ -75,18 +88,26 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = positionDeleteSparkType; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = positionDeleteRowSchema; + this.positionDeleteSparkType = positionDeleteSparkType; + this.useDeprecatedPositionDeleteWriter = + positionDeleteRowSchema != null + || (positionDeleteSparkType != null + && positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined()); } SparkFileWriterFactory( @@ -105,119 +126,110 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - ImmutableMap.of()); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = null; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = null; + this.useDeprecatedPositionDeleteWriter = false; } static Builder builderFor(Table table) { return new Builder(table); } - @Override - protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { - boolean withRow = - positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); - if (withRow) { - // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos - StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME); - StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); - builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); - } - - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(ORC.DataWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - private StructType dataSparkType() { - if (dataSparkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataSparkType = SparkSchemaUtil.convert(dataSchema()); - } - - return dataSparkType; - } - - private StructType equalityDeleteSparkType() { - if (equalityDeleteSparkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteSparkType; - } - private StructType positionDeleteSparkType() { if (positionDeleteSparkType == null) { // wrap the optional row schema into the position delete schema containing path and position - Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); + Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema); this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema); } return positionDeleteSparkType; } + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + if (!useDeprecatedPositionDeleteWriter) { + return super.newPositionDeleteWriter(file, spec, partition); + } else { + LOG.warn("Position deletes with deleted rows are deprecated and will be removed in 1.12.0."); + Map properties = table == null ? ImmutableMap.of() : table.properties(); + MetricsConfig metricsConfig = + table == null + ? MetricsConfig.forPositionDelete() + : MetricsConfig.forPositionDelete(table); + + try { + return switch (format) { + case AVRO -> + Avro.writeDeletes(file) + .createWriterFunc( + ignored -> + new SparkAvroWriter( + (StructType) + positionDeleteSparkType() + .apply(DELETE_FILE_ROW_FIELD_NAME) + .dataType())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(file) + .createWriterFunc(SparkOrcWriter::new) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(file) + .createWriterFunc( + msgType -> + SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + default -> + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); + }; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + } + static class Builder { private final Table table; private FileFormat dataFileFormat; @@ -340,4 +352,14 @@ SparkFileWriterFactory build() { writeProperties); } } + + private static StructType useOrConvert(StructType sparkType, Schema schema) { + if (sparkType != null) { + return sparkType; + } else if (schema != null) { + return SparkSchemaUtil.convert(schema); + } else { + return null; + } + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java new file mode 100644 index 000000000000..677f2e950b44 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; +import org.apache.iceberg.parquet.ParquetFormatModel; +import org.apache.iceberg.spark.data.SparkAvroWriter; +import org.apache.iceberg.spark.data.SparkOrcReader; +import org.apache.iceberg.spark.data.SparkOrcWriter; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class SparkFormatModels { + public static void register() { + FormatModelRegistry.register( + AvroFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> new SparkAvroWriter(engineSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkPlannedAvroReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + InternalRow.class, + StructType.class, + SparkParquetWriters::buildWriter, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + VectorizedSparkParquetReaders.CometColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildCometReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> + new SparkOrcWriter(icebergSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + new SparkOrcReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkOrcReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + } + + private SparkFormatModels() {} +} diff --git a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java index 3dbee5dfd0f5..9b7bbe0eb465 100644 --- a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java +++ b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -113,9 +115,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackHole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -171,9 +173,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { diff --git a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java index 8487988d9e5b..e77191d5e5cf 100644 --- a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java +++ b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -111,9 +113,9 @@ public void tearDownBenchmark() { @Threads(1) public void readUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { @@ -169,9 +171,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException { @Threads(1) public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = - Parquet.read(Files.localInput(dataFile)) + FormatModelRegistry.readBuilder( + FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) - .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { diff --git a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java index 47f0b72088f5..46fb96b8a305 100644 --- a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java +++ b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -23,13 +23,17 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -95,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { diff --git a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java index 4df890d86164..6cb5e23d0bb3 100644 --- a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java +++ b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -23,13 +23,17 @@ import java.io.File; import java.io.IOException; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -95,15 +99,16 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void writeUsingIcebergWriter() throws IOException { - try (FileAppender writer = - Parquet.write(Files.localOutput(dataFile)) - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + try (DataWriter writer = + FormatModelRegistry.dataWriteBuilder( + FileFormat.PARQUET, + InternalRow.class, + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile))) .schema(SCHEMA) + .spec(PartitionSpec.unpartitioned()) .build()) { - writer.addAll(rows); + writer.write(rows); } } @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException { .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") .set("spark.sql.caseSensitive", "false") .set("spark.sql.parquet.fieldId.write.enabled", "false") + .set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false") .schema(SCHEMA) .build()) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index d6a13bcd515d..674d238b3a09 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -52,13 +52,12 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileIO; @@ -719,32 +718,10 @@ private ForeachFunction rewritePositionDelete( private static CloseableIterable positionDeletesReader( InputFile inputFile, FileFormat format, PartitionSpec spec) { - Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) - .build(); - - case ORC: - return ORC.read(inputFile) - .project(deleteSchema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) - .build(); - - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); - } + return FormatModelRegistry.readBuilder(format, Record.class, inputFile) + .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema())) + .reuseContainers() + .build(); } private static PositionDeleteWriter positionDeletesWriter( @@ -754,30 +731,37 @@ private static PositionDeleteWriter positionDeletesWriter( StructLike partition, Schema rowSchema) throws IOException { - switch (format) { - case AVRO: - return Avro.writeDeletes(outputFile) - .createWriterFunc(DataWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case PARQUET: - return Parquet.writeDeletes(outputFile) - .createWriterFunc(GenericParquetWriter::create) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - case ORC: - return ORC.writeDeletes(outputFile) - .createWriterFunc(GenericOrcWriter::buildWriter) - .withPartition(partition) - .rowSchema(rowSchema) - .withSpec(spec) - .buildPositionWriter(); - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); + if (rowSchema == null) { + return FormatModelRegistry.positionDeleteWriteBuilder( + format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) + .partition(partition) + .spec(spec) + .build(); + } else { + return switch (format) { + case AVRO -> + Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default -> throw new UnsupportedOperationException("Unsupported file format: " + format); + }; } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 8bdfe7c3a810..3ff5ef9c577d 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -30,6 +30,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -41,6 +42,7 @@ import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; import org.apache.iceberg.util.UUIDUtil; @@ -75,10 +77,27 @@ public class SparkParquetWriters { private SparkParquetWriters() {} - @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(StructType dfSchema, MessageType type) { + return buildWriter(null, type, dfSchema); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter( + Schema icebergSchema, MessageType type, StructType dfSchema) { + return (ParquetValueWriter) + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); + } + + public static ParquetValueWriter buildWriter( + StructType dfSchema, MessageType type, Schema icebergSchema) { return (ParquetValueWriter) - ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type)); + ParquetWithSparkSchemaVisitor.visit( + dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema), + type, + new WriteBuilder(type)); } private static class WriteBuilder extends ParquetWithSparkSchemaVisitor> { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 8e25e81a05b2..55f9fc1768a3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.spark.SparkUtil; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader( return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator()); } - public static CometColumnarBatchReader buildCometReader( + public static VectorizedReader buildCometReader( Schema expectedSchema, MessageType fileSchema, Map idToConstant) { return (CometColumnarBatchReader) TypeWithSchemaVisitor.visit( @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader( readers -> new CometColumnarBatchReader(readers, expectedSchema))); } + /** A subclass of ColumnarBatch to identify Comet readers. */ + public static class CometColumnarBatch extends ColumnarBatch { + public CometColumnarBatch(ColumnVector[] columns) { + super(columns); + } + } + // enables unsafe memory access to avoid costly checks to see if index is within bounds // as long as it is not configured explicitly (see BoundsChecking in Arrow) private static void enableUnsafeMemoryAccess() { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..89c03a4c2b72 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; import java.util.Map; -import java.util.Set; import javax.annotation.Nonnull; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -29,21 +28,18 @@ import org.apache.iceberg.Table; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.OrcBatchReadConf; import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter; import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil; import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector; -import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Pair; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; @@ -76,79 +72,37 @@ protected CloseableIterable newBatchIterable( Expression residual, Map idToConstant, @Nonnull SparkDeleteFilter deleteFilter) { - CloseableIterable iterable; - switch (format) { - case PARQUET: - iterable = - newParquetIterable( - inputFile, start, length, residual, idToConstant, deleteFilter.requiredSchema()); - break; - case ORC: - iterable = newOrcIterable(inputFile, start, length, residual, idToConstant); - break; - default: - throw new UnsupportedOperationException( - "Format: " + format + " not supported for batched reads"); + Class readType = + useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : ColumnarBatch.class; + ReadBuilder readBuilder = + FormatModelRegistry.readBuilder(format, readType, inputFile); + + if (parquetConf != null) { + readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize()); + } else if (orcConf != null) { + readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize()); } - return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); - } + CloseableIterable iterable = + readBuilder + .project(deleteFilter.requiredSchema()) + .idToConstant(idToConstant) + .split(start, length) + .filter(residual) + .caseSensitive(caseSensitive()) + // Spark eagerly consumes the batches. So the underlying memory allocated could be + // reused without worrying about subsequent reads clobbering over each other. This + // improves read performance as every batch read doesn't have to pay the cost of + // allocating memory. + .reuseContainers() + .withNameMapping(nameMapping()) + .build(); - private CloseableIterable newParquetIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant, - Schema requiredSchema) { - return Parquet.read(inputFile) - .project(requiredSchema) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> { - if (parquetConf.readerType() == ParquetReaderType.COMET) { - return VectorizedSparkParquetReaders.buildCometReader( - requiredSchema, fileSchema, idToConstant); - } else { - return VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant); - } - }) - .recordsPerBatch(parquetConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - // Spark eagerly consumes the batches. So the underlying memory allocated could be reused - // without worrying about subsequent reads clobbering over each other. This improves - // read performance as every batch read doesn't have to pay the cost of allocating memory. - .reuseContainers() - .withNameMapping(nameMapping()) - .build(); + return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch); } - private CloseableIterable newOrcIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant) { - Set constantFieldIds = idToConstant.keySet(); - Set metadataFieldIds = MetadataColumns.metadataFieldIds(); - Sets.SetView constantAndMetadataFieldIds = - Sets.union(constantFieldIds, metadataFieldIds); - Schema schemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds); - - return ORC.read(inputFile) - .project(schemaWithoutConstantAndMetadataFields) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(orcConf.batchSize()) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + private boolean useComet() { + return parquetConf != null && parquetConf.readerType() == ParquetReaderType.COMET; } @VisibleForTesting diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index c12931e786b1..53d44e760afe 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -20,22 +20,15 @@ import java.util.Map; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.data.SparkOrcReader; -import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.spark.data.SparkPlannedAvroReader; -import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { @@ -58,69 +51,15 @@ protected CloseableIterable newIterable( Expression residual, Schema projection, Map idToConstant) { - switch (format) { - case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); - - case AVRO: - return newAvroIterable(file, start, length, projection, idToConstant); - - case ORC: - return newOrcIterable(file, start, length, residual, projection, idToConstant); - - default: - throw new UnsupportedOperationException("Cannot read unknown format: " + format); - } - } - - private CloseableIterable newAvroIterable( - InputFile file, long start, long length, Schema projection, Map idToConstant) { - return Avro.read(file) - .reuseContainers() + ReadBuilder reader = + FormatModelRegistry.readBuilder(format, InternalRow.class, file); + return reader .project(projection) - .split(start, length) - .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant)) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newParquetIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - return Parquet.read(file) + .idToConstant(idToConstant) .reuseContainers() .split(start, length) - .project(readSchema) - .createReaderFunc( - fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) - .filter(residual) .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); - } - - private CloseableIterable newOrcIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - return ORC.read(file) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(start, length) - .createReaderFunc( - readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(residual) - .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) .build(); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index a93db17e4a0f..2b3bf73d56b3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -23,13 +23,20 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; - -class SparkFileWriterFactory extends BaseFileWriterFactory { - private StructType dataSparkType; - private StructType equalityDeleteSparkType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkFileWriterFactory extends RegistryBasedFileWriterFactory { + private static final Logger LOG = LoggerFactory.getLogger(SparkFileWriterFactory.class); + // We need to use old writers to write position deletes with row data, which is a deprecated + // feature. + private final boolean useDeprecatedPositionDeleteWriter; private StructType positionDeleteSparkType; + private final Schema positionDeleteRowSchema; + private final Table table; + private final FileFormat format; private final Map writeProperties; /** @@ -75,18 +88,26 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = positionDeleteSparkType; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = positionDeleteRowSchema; + this.positionDeleteSparkType = positionDeleteSparkType; + this.useDeprecatedPositionDeleteWriter = + positionDeleteRowSchema != null + || (positionDeleteSparkType != null + && positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined()); } SparkFileWriterFactory( @@ -105,119 +126,110 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + InternalRow.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - ImmutableMap.of()); + writeProperties, + useOrConvert(dataSparkType, dataSchema), + useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema)); - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = null; + this.table = table; + this.format = dataFileFormat; this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.positionDeleteRowSchema = null; + this.useDeprecatedPositionDeleteWriter = false; } static Builder builderFor(Table table) { return new Builder(table); } - @Override - protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { - boolean withRow = - positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); - if (withRow) { - // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos - StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME); - StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); - builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); - } - - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - @Override - protected void configureDataWrite(ORC.DataWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); - } - - @Override - protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc(SparkOrcWriter::new); - builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - private StructType dataSparkType() { - if (dataSparkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataSparkType = SparkSchemaUtil.convert(dataSchema()); - } - - return dataSparkType; - } - - private StructType equalityDeleteSparkType() { - if (equalityDeleteSparkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteSparkType; - } - private StructType positionDeleteSparkType() { if (positionDeleteSparkType == null) { // wrap the optional row schema into the position delete schema containing path and position - Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); + Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema); this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema); } return positionDeleteSparkType; } + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + if (!useDeprecatedPositionDeleteWriter) { + return super.newPositionDeleteWriter(file, spec, partition); + } else { + LOG.warn("Position deletes with deleted rows are deprecated and will be removed in 1.12.0."); + Map properties = table == null ? ImmutableMap.of() : table.properties(); + MetricsConfig metricsConfig = + table == null + ? MetricsConfig.forPositionDelete() + : MetricsConfig.forPositionDelete(table); + + try { + return switch (format) { + case AVRO -> + Avro.writeDeletes(file) + .createWriterFunc( + ignored -> + new SparkAvroWriter( + (StructType) + positionDeleteSparkType() + .apply(DELETE_FILE_ROW_FIELD_NAME) + .dataType())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(file) + .createWriterFunc(SparkOrcWriter::new) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(file) + .createWriterFunc( + msgType -> + SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withPartition(partition) + .overwrite() + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + default -> + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); + }; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + } + static class Builder { private final Table table; private FileFormat dataFileFormat; @@ -340,4 +352,14 @@ SparkFileWriterFactory build() { writeProperties); } } + + private static StructType useOrConvert(StructType sparkType, Schema schema) { + if (sparkType != null) { + return sparkType; + } else if (schema != null) { + return SparkSchemaUtil.convert(schema); + } else { + return null; + } + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java new file mode 100644 index 000000000000..677f2e950b44 --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; +import org.apache.iceberg.parquet.ParquetFormatModel; +import org.apache.iceberg.spark.data.SparkAvroWriter; +import org.apache.iceberg.spark.data.SparkOrcReader; +import org.apache.iceberg.spark.data.SparkOrcWriter; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class SparkFormatModels { + public static void register() { + FormatModelRegistry.register( + AvroFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> new SparkAvroWriter(engineSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkPlannedAvroReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + InternalRow.class, + StructType.class, + SparkParquetWriters::buildWriter, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ParquetFormatModel.create( + VectorizedSparkParquetReaders.CometColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkParquetReaders.buildCometReader( + icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> + new SparkOrcWriter(icebergSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + new SparkOrcReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + ColumnarBatch.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + VectorizedSparkOrcReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + } + + private SparkFormatModels() {} +}