diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java index 6fde8bbebaff..af7176f7a1f6 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java @@ -21,9 +21,12 @@ import org.apache.iceberg.avro.AvroFormatModel; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.avro.PlannedDataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; import org.apache.iceberg.parquet.ParquetFormatModel; public class GenericFormatModels { @@ -48,6 +51,17 @@ public static void register() { GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes()); + + FormatModelRegistry.register( + ORCFormatModel.create( + Record.class, + Void.class, + (icebergSchema, fileSchema, engineSchema) -> + GenericOrcWriter.buildWriter(icebergSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + GenericOrcReader.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register(ORCFormatModel.forPositionDeletes()); } private GenericFormatModels() {} diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java index ab5968da8b09..d2bf1cb3e88d 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java @@ -55,7 +55,7 @@ public class TestGenericFormatModels { RandomGenericData.generate(TestBase.SCHEMA, 10, 1L); private static final FileFormat[] FILE_FORMATS = - new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET}; + new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}; @TempDir protected Path temp; diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 5559c0041b75..2c8fd6e436b2 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -45,6 +45,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -52,6 +53,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -79,7 +81,10 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.orc.CompressionKind; @@ -179,9 +184,8 @@ public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { return this; } - // Supposed to always be a private method used strictly by data and delete write builders - private WriteBuilder createContextFunc( - Function, Context> newCreateContextFunc) { + // supposed to always be a private method used strictly by data and delete write builders + WriteBuilder createContextFunc(Function, Context> newCreateContextFunc) { this.createContextFunc = newCreateContextFunc; return this; } @@ -219,7 +223,7 @@ public FileAppender build() { metricsConfig); } - private static class Context { + static class Context { private final long stripeSize; private final long blockSize; private final int vectorizedRowBatchSize; @@ -699,6 +703,7 @@ public static class ReadBuilder { private Function> readerFunc; private Function> batchedReaderFunc; private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE; + private Set constantFieldIds = ImmutableSet.of(); private ReadBuilder(InputFile file) { Preconditions.checkNotNull(file, "Input file cannot be null"); @@ -775,12 +780,18 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) { return this; } + ReadBuilder constantFieldIds(Set newConstantFieldIds) { + this.constantFieldIds = newConstantFieldIds; + return this; + } + public CloseableIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); return new OrcIterable<>( file, conf, - schema, + TypeUtil.selectNot( + schema, Sets.union(constantFieldIds, MetadataColumns.metadataFieldIds())), nameMapping, start, length, diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java b/orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java new file mode 100644 index 000000000000..ed5d734ef9ac --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.orc; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.orc.GenericOrcWriters; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.BaseFormatModel; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.apache.iceberg.formats.ReadBuilder; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.orc.TypeDescription; + +public class ORCFormatModel + extends BaseFormatModel, R, TypeDescription> { + private final boolean isBatchReader; + + public static ORCFormatModel, Void, Object> forPositionDeletes() { + return new ORCFormatModel<>(PositionDelete.deleteClass(), Void.class, null, null, false); + } + + public static ORCFormatModel> create( + Class type, + Class schemaType, + WriterFunction, S, TypeDescription> writerFunction, + ReaderFunction, S, TypeDescription> readerFunction) { + return new ORCFormatModel<>(type, schemaType, writerFunction, readerFunction, false); + } + + public static ORCFormatModel> create( + Class type, + Class schemaType, + ReaderFunction, S, TypeDescription> batchReaderFunction) { + return new ORCFormatModel<>(type, schemaType, null, batchReaderFunction, true); + } + + private ORCFormatModel( + Class type, + Class schemaType, + WriterFunction, S, TypeDescription> writerFunction, + ReaderFunction readerFunction, + boolean isBatchReader) { + super(type, schemaType, writerFunction, readerFunction); + this.isBatchReader = isBatchReader; + } + + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return new WriteBuilderWrapper<>(outputFile, writerFunction()); + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return new ReadBuilderWrapper<>(inputFile, readerFunction(), isBatchReader); + } + + private static class WriteBuilderWrapper implements ModelWriteBuilder { + private final ORC.WriteBuilder internal; + private final WriterFunction, S, TypeDescription> writerFunction; + private Schema schema; + private S engineSchema; + + private FileContent content; + + private WriteBuilderWrapper( + EncryptedOutputFile outputFile, + WriterFunction, S, TypeDescription> writerFunction) { + this.internal = ORC.write(outputFile); + this.writerFunction = writerFunction; + } + + @Override + public ModelWriteBuilder schema(Schema newSchema) { + this.schema = newSchema; + internal.schema(newSchema); + return this; + } + + @Override + public ModelWriteBuilder engineSchema(S newSchema) { + this.engineSchema = newSchema; + return this; + } + + @Override + public ModelWriteBuilder set(String property, String value) { + internal.set(property, value); + return this; + } + + @Override + public ModelWriteBuilder setAll(Map properties) { + internal.setAll(properties); + return this; + } + + @Override + public ModelWriteBuilder meta(String property, String value) { + internal.metadata(property, value); + return this; + } + + @Override + public ModelWriteBuilder content(FileContent newContent) { + this.content = newContent; + return this; + } + + @Override + public ModelWriteBuilder metricsConfig(MetricsConfig metricsConfig) { + internal.metricsConfig(metricsConfig); + return this; + } + + @Override + public ModelWriteBuilder overwrite() { + internal.overwrite(); + return this; + } + + @Override + public ModelWriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + // ORC doesn't support file encryption + throw new UnsupportedOperationException("ORC does not support file encryption keys"); + } + + @Override + public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + // ORC doesn't support file encryption + throw new UnsupportedOperationException("ORC does not support AAD prefix"); + } + + @Override + public FileAppender build() { + switch (content) { + case DATA: + internal.createContextFunc(ORC.WriteBuilder.Context::dataContext); + internal.createWriterFunc( + (icebergSchema, typeDescription) -> + writerFunction.write(icebergSchema, typeDescription, engineSchema)); + break; + case EQUALITY_DELETES: + internal.createContextFunc(ORC.WriteBuilder.Context::deleteContext); + internal.createWriterFunc( + (icebergSchema, typeDescription) -> + writerFunction.write(icebergSchema, typeDescription, engineSchema)); + break; + case POSITION_DELETES: + Preconditions.checkState( + schema == null, + "Invalid schema: %s. Position deletes with schema are not supported by the API.", + schema); + Preconditions.checkState( + engineSchema == null, + "Invalid engineSchema: %s. Position deletes with schema are not supported by the API.", + engineSchema); + + internal.createContextFunc(ORC.WriteBuilder.Context::deleteContext); + internal.createWriterFunc( + (icebergSchema, typeDescription) -> + GenericOrcWriters.positionDelete( + GenericOrcWriter.buildWriter(icebergSchema, typeDescription), + Function.identity())); + internal.schema(DeleteSchemaUtil.pathPosSchema()); + break; + default: + throw new IllegalArgumentException("Unknown file content: " + content); + } + + return internal.build(); + } + } + + private static class ReadBuilderWrapper implements ReadBuilder { + private final ORC.ReadBuilder internal; + private final ReaderFunction readerFunction; + private final boolean isBatchReader; + private S engineSchema; + private boolean reuseContainers = false; + private Schema icebergSchema; + private Map idToConstant = ImmutableMap.of(); + + private ReadBuilderWrapper( + InputFile inputFile, + ReaderFunction readerFunction, + boolean isBatchReader) { + this.internal = ORC.read(inputFile); + this.readerFunction = readerFunction; + this.isBatchReader = isBatchReader; + } + + @Override + public ReadBuilder split(long newStart, long newLength) { + internal.split(newStart, newLength); + return this; + } + + @Override + public ReadBuilder project(Schema schema) { + this.icebergSchema = schema; + internal.project(schema); + return this; + } + + @Override + public ReadBuilder engineProjection(S schema) { + this.engineSchema = schema; + return this; + } + + @Override + public ReadBuilder caseSensitive(boolean caseSensitive) { + internal.caseSensitive(caseSensitive); + return this; + } + + @Override + public ReadBuilder filter(Expression filter) { + internal.filter(filter); + return this; + } + + @Override + public ReadBuilder set(String key, String value) { + internal.config(key, value); + return this; + } + + @Override + public ReadBuilder reuseContainers() { + this.reuseContainers = true; + return this; + } + + @Override + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + internal.recordsPerBatch(numRowsPerBatch); + return this; + } + + @Override + public ReadBuilder idToConstant(Map newIdToConstant) { + internal.constantFieldIds(newIdToConstant.keySet()); + this.idToConstant = newIdToConstant; + return this; + } + + @Override + public ReadBuilder withNameMapping(NameMapping nameMapping) { + internal.withNameMapping(nameMapping); + return this; + } + + @Override + public org.apache.iceberg.io.CloseableIterable build() { + Preconditions.checkNotNull(reuseContainers, "Reuse containers is required for ORC read"); + if (isBatchReader) { + return internal + .createBatchedReaderFunc( + typeDescription -> + (OrcBatchReader) + readerFunction.read( + icebergSchema, typeDescription, engineSchema, idToConstant)) + .build(); + } else { + return internal + .createReaderFunc( + typeDescription -> + (OrcRowReader) + readerFunction.read( + icebergSchema, typeDescription, engineSchema, idToConstant)) + .build(); + } + } + } +}