Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private FormatModelRegistry() {}
ImmutableList.of(
"org.apache.iceberg.data.GenericFormatModels",
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels",
"org.apache.iceberg.flink.data.FlinkFormatModels");
"org.apache.iceberg.flink.data.FlinkFormatModels",
"org.apache.iceberg.spark.source.SparkFormatModels");

// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -113,9 +115,9 @@ public void tearDownBenchmark() {
@Threads(1)
public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down Expand Up @@ -171,9 +173,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -111,9 +113,9 @@ public void tearDownBenchmark() {
@Threads(1)
public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down Expand Up @@ -169,9 +171,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
Expand Down Expand Up @@ -95,15 +99,16 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void writeUsingIcebergWriter() throws IOException {
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(dataFile))
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
try (DataWriter<InternalRow> writer =
FormatModelRegistry.dataWriteBuilder(
FileFormat.PARQUET,
InternalRow.class,
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
.spec(PartitionSpec.unpartitioned())
.build()) {

writer.addAll(rows);
writer.write(rows);
}
}

Expand All @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with this since it isn't production code. It's unlikely that this is going to cause problems cherry-picking.

.schema(SCHEMA)
.build()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
Expand Down Expand Up @@ -95,15 +99,16 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void writeUsingIcebergWriter() throws IOException {
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(dataFile))
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
try (DataWriter<InternalRow> writer =
FormatModelRegistry.dataWriteBuilder(
FileFormat.PARQUET,
InternalRow.class,
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
.spec(PartitionSpec.unpartitioned())
.build()) {

writer.addAll(rows);
writer.write(rows);
}
}

Expand All @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we file an issue to track the underlying Spark 4.1 test failure, so we can fix the root cause later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both "true" and "false" is ok as well. The issue was that the config was not set.

.schema(SCHEMA)
.build()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -719,32 +718,10 @@ private ForeachFunction<DeleteFile> rewritePositionDelete(

private static CloseableIterable<Record> positionDeletesReader(
InputFile inputFile, FileFormat format, PartitionSpec spec) {
Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
switch (format) {
case AVRO:
return Avro.read(inputFile)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema))
.build();

case PARQUET:
return Parquet.read(inputFile)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.build();

case ORC:
return ORC.read(inputFile)
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.build();

default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
.project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
.reuseContainers()
.build();
}

private static PositionDeleteWriter<Record> positionDeletesWriter(
Expand All @@ -754,30 +731,37 @@ private static PositionDeleteWriter<Record> positionDeletesWriter(
StructLike partition,
Schema rowSchema)
throws IOException {
switch (format) {
case AVRO:
return Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case PARQUET:
return Parquet.writeDeletes(outputFile)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case ORC:
return ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
if (rowSchema == null) {
return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
.partition(partition)
.spec(spec)
.build();
} else {
return switch (format) {
case AVRO ->
Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case PARQUET ->
Parquet.writeDeletes(outputFile)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case ORC ->
ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
default -> throw new UnsupportedOperationException("Unsupported file format: " + format);
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
Expand All @@ -41,6 +42,7 @@
import org.apache.iceberg.parquet.VariantWriterBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -75,10 +77,27 @@
public class SparkParquetWriters {
private SparkParquetWriters() {}

@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(StructType dfSchema, MessageType type) {
return buildWriter(null, type, dfSchema);
}

@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(
Schema icebergSchema, MessageType type, StructType dfSchema) {
return (ParquetValueWriter<T>)
ParquetWithSparkSchemaVisitor.visit(
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
type,
new WriteBuilder(type));
}

public static <T> ParquetValueWriter<T> buildWriter(
StructType dfSchema, MessageType type, Schema icebergSchema) {
return (ParquetValueWriter<T>)
ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type));
ParquetWithSparkSchemaVisitor.visit(
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
type,
new WriteBuilder(type));
}
Comment on lines +84 to 101
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it intentional here to have 2 functions with different signature ordering? might be confusing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve given this quite a bit of thought. On the caller side we use the ordericebergSchema, fileSchema, engineSchema, and I believe this is the most logical ordering. If anyone feels strongly otherwise, I’m happy to adjust it.


private static class WriteBuilder extends ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader(
return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator());
}

public static CometColumnarBatchReader buildCometReader(
public static VectorizedReader<ColumnarBatch> buildCometReader(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return (CometColumnarBatchReader)
TypeWithSchemaVisitor.visit(
Expand All @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader(
readers -> new CometColumnarBatchReader(readers, expectedSchema)));
}

/** A subclass of ColumnarBatch to identify Comet readers. */
public static class CometColumnarBatch extends ColumnarBatch {
public CometColumnarBatch(ColumnVector[] columns) {
super(columns);
}
}

// enables unsafe memory access to avoid costly checks to see if index is within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
Expand Down
Loading