Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -113,9 +115,9 @@ public void tearDownBenchmark() {
@Threads(1)
public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down Expand Up @@ -171,9 +173,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -111,9 +113,9 @@ public void tearDownBenchmark() {
@Threads(1)
public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down Expand Up @@ -169,9 +171,9 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
Parquet.read(Files.localInput(dataFile))
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {

for (InternalRow row : rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
Expand Down Expand Up @@ -95,15 +99,16 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void writeUsingIcebergWriter() throws IOException {
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(dataFile))
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
try (DataWriter<InternalRow> writer =
FormatModelRegistry.dataWriteBuilder(
FileFormat.PARQUET,
InternalRow.class,
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
.spec(PartitionSpec.unpartitioned())
.build()) {

writer.addAll(rows);
writer.write(rows);
}
}

Expand All @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
.schema(SCHEMA)
.build()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
Expand Down Expand Up @@ -95,15 +99,16 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void writeUsingIcebergWriter() throws IOException {
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(dataFile))
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
try (DataWriter<InternalRow> writer =
FormatModelRegistry.dataWriteBuilder(
FileFormat.PARQUET,
InternalRow.class,
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
.spec(PartitionSpec.unpartitioned())
.build()) {

writer.addAll(rows);
writer.write(rows);
}
}

Expand All @@ -121,6 +126,7 @@ public void writeUsingSparkWriter() throws IOException {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
.schema(SCHEMA)
.build()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -719,32 +718,10 @@ private ForeachFunction<DeleteFile> rewritePositionDelete(

private static CloseableIterable<Record> positionDeletesReader(
InputFile inputFile, FileFormat format, PartitionSpec spec) {
Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
switch (format) {
case AVRO:
return Avro.read(inputFile)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema))
.build();

case PARQUET:
return Parquet.read(inputFile)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.build();

case ORC:
return ORC.read(inputFile)
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.build();

default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
.project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
.reuseContainers()
.build();
}

private static PositionDeleteWriter<Record> positionDeletesWriter(
Expand All @@ -754,30 +731,37 @@ private static PositionDeleteWriter<Record> positionDeletesWriter(
StructLike partition,
Schema rowSchema)
throws IOException {
switch (format) {
case AVRO:
return Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case PARQUET:
return Parquet.writeDeletes(outputFile)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case ORC:
return ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
if (rowSchema == null) {
return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
.partition(partition)
.spec(spec)
.build();
} else {
return switch (format) {
case AVRO ->
Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case PARQUET ->
Parquet.writeDeletes(outputFile)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case ORC ->
ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
default -> throw new UnsupportedOperationException("Unsupported file format: " + format);
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
Expand All @@ -34,6 +35,7 @@
import org.apache.iceberg.parquet.ParquetValueWriters.RepeatedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -61,10 +63,27 @@
public class SparkParquetWriters {
private SparkParquetWriters() {}

@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(StructType dfSchema, MessageType type) {
return buildWriter(null, type, dfSchema);
}

@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(
Schema icebergSchema, MessageType type, StructType dfSchema) {
return (ParquetValueWriter<T>)
ParquetWithSparkSchemaVisitor.visit(
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
type,
new WriteBuilder(type));
}

public static <T> ParquetValueWriter<T> buildWriter(
StructType dfSchema, MessageType type, Schema icebergSchema) {
return (ParquetValueWriter<T>)
ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new WriteBuilder(type));
ParquetWithSparkSchemaVisitor.visit(
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
type,
new WriteBuilder(type));
}

private static class WriteBuilder extends ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader(
return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator());
}

public static CometColumnarBatchReader buildCometReader(
public static VectorizedReader<ColumnarBatch> buildCometReader(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return (CometColumnarBatchReader)
TypeWithSchemaVisitor.visit(
Expand All @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader(
readers -> new CometColumnarBatchReader(readers, expectedSchema)));
}

/** A subclass of ColumnarBatch to identify Comet readers. */
public static class CometColumnarBatch extends ColumnarBatch {
public CometColumnarBatch(ColumnVector[] columns) {
super(columns);
}
}

// enables unsafe memory access to avoid costly checks to see if index is within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
Expand Down
Loading