diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index c8193755f5ba..125bf8a03bfc 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -50,7 +50,7 @@ public class PositionDeleteWriter implements FileWriter, De private static final Set FILE_AND_POS_FIELD_IDS = ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId()); - private final FileAppender appender; + private final FileAppender> appender; private final FileFormat format; private final String location; private final PartitionSpec spec; @@ -59,14 +59,22 @@ public class PositionDeleteWriter implements FileWriter, De private final CharSequenceSet referencedDataFiles; private DeleteFile deleteFile = null; + /** + * Creates a new position delete writer. + * + * @deprecated since 1.11.0, will be updated in 1.12.0 to accept {@code + * FileAppender>} instead of {@code FileAppender}. + */ + @Deprecated + @SuppressWarnings("unchecked") public PositionDeleteWriter( - FileAppender appender, + FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { - this.appender = appender; + this.appender = (FileAppender>) appender; this.format = format; this.location = location; this.spec = spec; diff --git a/core/src/main/java/org/apache/iceberg/formats/BaseFormatModel.java b/core/src/main/java/org/apache/iceberg/formats/BaseFormatModel.java new file mode 100644 index 000000000000..7cba465670d9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/BaseFormatModel.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.util.Map; +import org.apache.iceberg.Schema; + +/** + * Base implementation of {@link FormatModel} that provides common functionality for format models. + * + *

This abstract class serves as a foundation for creating format-specific models that handle + * reading and writing data in various file formats. + * + * @param output type used for reading data, and input type for writing data and deletes + * @param the engine-specific schema type describing the input/output data + * @param the file format specific writer type produced by the writer function + * @param the file format specific reader type produced by the reader function + * @param the file schema type used by the underlying file format + */ +public abstract class BaseFormatModel implements FormatModel { + private final Class type; + private final Class schemaType; + private final WriterFunction writerFunction; + private final ReaderFunction readerFunction; + + /** + * Constructs a new BaseFormatModel with the specified configuration. + * + * @param type the row type class for the object model implementation processed by this factory. + * @param schemaType the schema type class for the object model implementation processed by this + * factory. + * @param writerFunction the function used to create writers for this format + * @param readerFunction the function used to create readers for this format + */ + protected BaseFormatModel( + Class type, + Class schemaType, + WriterFunction writerFunction, + ReaderFunction readerFunction) { + this.type = type; + this.schemaType = schemaType; + this.writerFunction = writerFunction; + this.readerFunction = readerFunction; + } + + @Override + public Class type() { + return type; + } + + @Override + public Class schemaType() { + return schemaType; + } + + /** + * Returns the writer function used to create writers for this format. + * + * @return the writer function + */ + protected WriterFunction writerFunction() { + return writerFunction; + } + + /** + * Returns the reader function used to create readers for this format. + * + * @return the reader function + */ + protected ReaderFunction readerFunction() { + return readerFunction; + } + + /** + * A functional interface for creating writers that can write data in a specific format. + * + * @param the file format specific writer type to be created + * @param the engine-specific schema type describing the input data + * @param the file schema type used by the underlying file format + */ + @FunctionalInterface + public interface WriterFunction { + /** + * Creates a writer for the given schemas. + * + * @param icebergSchema the Iceberg schema defining the table structure + * @param fileSchema the file format specific target schema for the output files + * @param engineSchema the engine-specific schema for the input data (optional) + * @return a writer configured for the given schemas + */ + W write(Schema icebergSchema, F fileSchema, S engineSchema); + } + + /** + * A functional interface for creating readers that can read data from a specific format. + * + * @param the file format specific reader type to be created + * @param the engine-specific schema type describing the output data + * @param the file schema type used by the underlying file format + */ + @FunctionalInterface + public interface ReaderFunction { + /** + * Creates a reader for the given schemas. + * + * @param icebergSchema the Iceberg schema defining the table structure + * @param fileSchema the schema that the file was written with, although it can be null + * in the case of Avro files because it is passed in later + * @param engineSchema the engine-specific schema for the output data (optional) + * @param idToConstant a map of field IDs to constant values for partition columns and other + * fields not stored in data files + * @return a reader configured for the given schemas + */ + R read(Schema icebergSchema, F fileSchema, S engineSchema, Map idToConstant); + } +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilder.java b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilder.java new file mode 100644 index 000000000000..3eaba6ab3bc4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilder.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.util.ArrayUtil; + +/** + * A generic builder interface for creating specialized file writers in the Iceberg ecosystem. + * + *

This builder provides a unified configuration API for generating various types of content + * writers: + * + *

    + *
  • {@link DataWriter} for creating data files with table records + *
  • {@link EqualityDeleteWriter} for creating files with equality-based delete records + *
  • {@link PositionDeleteWriter} for creating files with position-based delete records + *
+ * + *

Each concrete implementation configures the underlying file format writer while adding + * content-specific metadata and behaviors. + * + * @param the concrete writer type the builder produces + * @param the type of the engine schema for the input data + */ +public interface FileWriterBuilder, S> { + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + FileWriterBuilder set(String property, String value); + + /** + * Adds the new properties to the writer configuration. + * + * @param properties a map of writer config properties + * @return this for method chaining + */ + default FileWriterBuilder setAll(Map properties) { + properties.forEach(this::set); + return this; + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + FileWriterBuilder meta(String property, String value); + + /** + * Add the new properties to file metadata for the created file. + * + * @param properties a map of file metadata properties + * @return this for method chaining + */ + default FileWriterBuilder meta(Map properties) { + properties.forEach(this::meta); + return this; + } + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + FileWriterBuilder metricsConfig(MetricsConfig metricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + FileWriterBuilder overwrite(); + + /** + * Sets the encryption key used for writing the file. If the writer does not support encryption, + * then an exception should be thrown. + */ + FileWriterBuilder withFileEncryptionKey(ByteBuffer encryptionKey); + + /** + * Sets the additional authentication data (AAD) prefix used for writing the file. If the writer + * does not support encryption, then an exception should be thrown. + */ + FileWriterBuilder withAADPrefix(ByteBuffer aadPrefix); + + /** Sets the partition specification for the Iceberg metadata. */ + FileWriterBuilder spec(PartitionSpec newSpec); + + /** Sets the partition value for the Iceberg metadata. */ + FileWriterBuilder partition(StructLike partition); + + /** Sets the encryption key metadata for Iceberg metadata. */ + FileWriterBuilder keyMetadata(EncryptionKeyMetadata keyMetadata); + + /** Sets the sort order for the Iceberg metadata. */ + FileWriterBuilder sortOrder(SortOrder sortOrder); + + /** Set the file schema. */ + FileWriterBuilder schema(Schema schema); + + /** + * Sets the engine-specific schema that describes records accepted by the writer. + * + *

Some data types require additional type information from the engine schema that cannot be + * fully expressed by the Iceberg schema or the data itself. For example, an engine's tinyint or + * smallint types are mapped to Iceberg's integer type, but the writer may need the original type + * for proper serialization. Similarly, a variant type may use a shredded representation that + * relies on engine-specific metadata to map back to the Iceberg schema. + * + *

The engine schema must be aligned with the Iceberg schema, but may include representation + * details that Iceberg considers equivalent. + */ + FileWriterBuilder engineSchema(S schema); + + /** + * Sets the equality field ids for the equality delete writer. Only applicable when building an + * {@link EqualityDeleteWriter}. + */ + default FileWriterBuilder equalityFieldIds(List fieldIds) { + return equalityFieldIds(ArrayUtil.toIntArray(fieldIds)); + } + + /** + * Sets the equality field ids for the equality delete writer. Only applicable when building an + * {@link EqualityDeleteWriter}. + */ + FileWriterBuilder equalityFieldIds(int... fieldIds); + + W build() throws IOException; +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java new file mode 100644 index 000000000000..d0f9c6da2fbf --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +abstract class FileWriterBuilderImpl, D, S> + implements FileWriterBuilder { + private final ModelWriteBuilder modelWriteBuilder; + private final String location; + private final FileFormat format; + private final FileContent content; + private Schema schema = null; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + private int[] equalityFieldIds = null; + + /** Creates a builder for {@link DataWriter} instances for writing data files. */ + static FileWriterBuilder, S> forDataFile( + FormatModel model, EncryptedOutputFile outputFile) { + return new DataFileWriterBuilder<>(model, outputFile); + } + + /** + * Creates a builder for {@link EqualityDeleteWriter} instances for writing equality delete files. + */ + static FileWriterBuilder, S> forEqualityDelete( + FormatModel model, EncryptedOutputFile outputFile) { + return new EqualityDeleteWriterBuilder<>(model, outputFile); + } + + /** + * Creates a builder for {@link PositionDeleteWriter} instances for writing position delete files. + */ + static FileWriterBuilder, S> forPositionDelete( + FormatModel, S> model, EncryptedOutputFile outputFile) { + return new PositionDeleteWriterBuilder<>(model, outputFile); + } + + private FileWriterBuilderImpl( + FormatModel model, EncryptedOutputFile outputFile, FileContent content) { + this.modelWriteBuilder = model.writeBuilder(outputFile).content(content); + this.location = outputFile.encryptingOutputFile().location(); + this.format = model.format(); + this.content = content; + } + + @Override + public FileWriterBuilderImpl set(String property, String value) { + modelWriteBuilder.set(property, value); + return this; + } + + @Override + public FileWriterBuilderImpl meta(String property, String value) { + modelWriteBuilder.meta(property, value); + return this; + } + + @Override + public FileWriterBuilderImpl metricsConfig(MetricsConfig metricsConfig) { + modelWriteBuilder.metricsConfig(metricsConfig); + return this; + } + + @Override + public FileWriterBuilderImpl overwrite() { + modelWriteBuilder.overwrite(); + return this; + } + + @Override + public FileWriterBuilderImpl withFileEncryptionKey(ByteBuffer encryptionKey) { + modelWriteBuilder.withFileEncryptionKey(encryptionKey); + return this; + } + + @Override + public FileWriterBuilderImpl withAADPrefix(ByteBuffer aadPrefix) { + modelWriteBuilder.withAADPrefix(aadPrefix); + return this; + } + + @Override + public FileWriterBuilderImpl schema(Schema newSchema) { + modelWriteBuilder.schema(newSchema); + this.schema = newSchema; + return this; + } + + @Override + public FileWriterBuilderImpl engineSchema(S newSchema) { + modelWriteBuilder.engineSchema(newSchema); + return this; + } + + @Override + public FileWriterBuilderImpl spec(PartitionSpec newSpec) { + this.spec = newSpec; + return this; + } + + @Override + public FileWriterBuilderImpl partition(StructLike newPartition) { + this.partition = newPartition; + return this; + } + + @Override + public FileWriterBuilderImpl keyMetadata(EncryptionKeyMetadata newKeyMetadata) { + this.keyMetadata = newKeyMetadata; + return this; + } + + @Override + public FileWriterBuilderImpl sortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return this; + } + + @Override + public FileWriterBuilderImpl equalityFieldIds(int... fieldIds) { + if (content != FileContent.EQUALITY_DELETES) { + throw new UnsupportedOperationException( + "Equality field ids not supported for this writer type"); + } + + this.equalityFieldIds = fieldIds; + return this; + } + + ModelWriteBuilder modelWriteBuilder() { + return modelWriteBuilder; + } + + String location() { + return location; + } + + FileFormat format() { + return format; + } + + Schema schema() { + return schema; + } + + PartitionSpec spec() { + return spec; + } + + StructLike partition() { + return partition; + } + + EncryptionKeyMetadata keyMetadata() { + return keyMetadata; + } + + SortOrder sortOrder() { + return sortOrder; + } + + int[] equalityFieldIds() { + return equalityFieldIds; + } + + protected void validate() { + Preconditions.checkState( + content != FileContent.EQUALITY_DELETES || equalityFieldIds != null, + "Invalid delete field ids for equality delete writer: null"); + Preconditions.checkState( + content == FileContent.POSITION_DELETES || schema != null, "Invalid schema: null"); + Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Invalid partition, does not match spec: %s", + spec); + } + + /** Builder for creating {@link DataWriter} instances for writing data files. */ + private static class DataFileWriterBuilder + extends FileWriterBuilderImpl, D, S> { + + private DataFileWriterBuilder(FormatModel model, EncryptedOutputFile outputFile) { + super(model, outputFile, FileContent.DATA); + } + + @Override + public DataWriter build() throws IOException { + validate(); + + return new DataWriter<>( + modelWriteBuilder().build(), + format(), + location(), + spec(), + partition(), + keyMetadata(), + sortOrder()); + } + } + + /** + * Builder for creating {@link EqualityDeleteWriter} instances for writing equality delete files. + */ + private static class EqualityDeleteWriterBuilder + extends FileWriterBuilderImpl, D, S> { + + private EqualityDeleteWriterBuilder(FormatModel model, EncryptedOutputFile outputFile) { + super(model, outputFile, FileContent.EQUALITY_DELETES); + } + + @Override + public EqualityDeleteWriter build() throws IOException { + validate(); + + return new EqualityDeleteWriter<>( + modelWriteBuilder() + .schema(schema()) + .meta("delete-type", "equality") + .meta( + "delete-field-ids", + IntStream.of(equalityFieldIds()) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))) + .build(), + format(), + location(), + spec(), + partition(), + keyMetadata(), + sortOrder(), + equalityFieldIds()); + } + } + + /** + * Builder for creating {@link PositionDeleteWriter} instances for writing position delete files. + */ + private static class PositionDeleteWriterBuilder + extends FileWriterBuilderImpl, PositionDelete, S> { + + private PositionDeleteWriterBuilder( + FormatModel, S> model, EncryptedOutputFile outputFile) { + super(model, outputFile, FileContent.POSITION_DELETES); + } + + @Override + public PositionDeleteWriter build() throws IOException { + validate(); + + return new PositionDeleteWriter<>( + modelWriteBuilder().meta("delete-type", "position").build(), + format(), + location(), + spec(), + partition(), + keyMetadata()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModel.java b/core/src/main/java/org/apache/iceberg/formats/FormatModel.java new file mode 100644 index 000000000000..307a12625cd3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModel.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; + +/** + * Interface that provides a unified abstraction for converting between data file formats and + * input/output data representations. + * + *

{@link FormatModel} serves as a bridge between storage formats ({@link FileFormat}) and + * expected input/output data structures, optimizing performance through direct conversion without + * intermediate representations. File format implementations handle the low-level parsing details + * while the object model determines the in-memory representation used for the parsed data. + * Together, these provide a consistent API for consuming data files while optimizing for specific + * processing engines. + * + *

Iceberg provides some built-in object models and processing engines can implement custom + * object models to integrate with Iceberg's file reading and writing capabilities. + * + * @param output type used for reading data, and input type for writing data and deletes + * @param the type of the schema for the input/output data + */ +public interface FormatModel { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * Return the row type class for the object model implementation processed by this factory. + * + *

The model types act as a contract specifying the expected data structures for both reading + * (converting file formats into output objects) and writing (converting input objects into file + * formats). This ensures proper integration between Iceberg's storage layer and processing + * engines. + * + *

Processing engines can define their own object models by implementing this interface and + * using their own model name. They can register these models with Iceberg by using the {@link + * FormatModelRegistry}. This allows custom data representations to be seamlessly integrated with + * Iceberg's file format handlers. + * + * @return the type of the data structures handled by this model implementation + */ + Class type(); + + /** + * Return the schema type class for the object model implementation processed by this factory. + * + * @return the type of the schema for the data structures handled by this model implementation + */ + Class schemaType(); + + /** + * Creates a writer builder for data files. + * + *

The returned {@link ModelWriteBuilder} configures and creates a writer that converts input + * objects into the file format supported by this factory. + * + * @param outputFile destination for the written data + * @return configured writer builder + */ + ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile); + + /** + * Creates a file reader builder for the specified input file. + * + *

The returned {@link ReadBuilder} configures and creates a reader that converts data from the + * file format into output objects supported by this factory. + * + * @param inputFile source file to read from + * @return configured reader builder for the specified input + */ + ReadBuilder readBuilder(InputFile inputFile); +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java new file mode 100644 index 000000000000..a15aab323c01 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A registry that manages file-format-specific readers and writers through a unified object model + * factory interface. + * + *

This registry provides access to {@link ReadBuilder}s for data consumption and {@link + * FileWriterBuilder}s for writing various types of Iceberg content files. The appropriate builder + * is selected based on {@link FileFormat} and object model name. + * + *

{@link FormatModel} objects are registered through {@link #register(FormatModel)} and used for + * creating readers and writers. Read builders are returned directly from the factory. Write + * builders may be wrapped in specialized content file writer implementations depending on the + * requested builder type. + */ +public final class FormatModelRegistry { + private FormatModelRegistry() {} + + private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List CLASSES_TO_REGISTER = ImmutableList.of(); + + // Format models indexed by file format and object model class + private static final Map>, FormatModel> MODELS = + Maps.newConcurrentMap(); + + static { + registerSupportedFormats(); + } + + /** + * Registers an {@link FormatModel} in this registry. + * + *

The {@link FormatModel} creates readers and writers for a specific combinations of file + * format (Parquet, ORC, Avro) and object model (for example: "generic", "spark", "flink", etc.). + * Registering custom factories allows integration of new data processing engines for the + * supported file formats with Iceberg's file access mechanisms. + * + *

Each factory must be uniquely identified by its combination of file format and object model + * name. This uniqueness constraint prevents ambiguity when selecting factories for read and write + * operations. + * + * @param formatModel the factory implementation to register + * @throws IllegalArgumentException if a factory is already registered for the combination of + * {@link FormatModel#format()} and {@link FormatModel#type()} + */ + public static synchronized void register(FormatModel formatModel) { + Pair> key = Pair.of(formatModel.format(), formatModel.type()); + + FormatModel existing = MODELS.get(key); + Preconditions.checkArgument( + existing == null, + "Cannot register %s: %s is registered for format=%s type=%s schemaType=%s", + formatModel.getClass(), + existing == null ? null : existing.getClass(), + key.first(), + key.second(), + existing == null ? null : existing.schemaType()); + + MODELS.put(key, formatModel); + } + + /** + * Returns a reader builder for the specified file format and object model. + * + *

The returned {@link ReadBuilder} provides a fluent interface for configuring how data is + * read from the input file and converted to the output objects. + * + * @param format the file format (Parquet, Avro, ORC) that determines the parsing implementation + * @param type the output type + * @param inputFile source file to read data from + * @param the type of data records the reader will produce + * @param the type of the output schema for the reader + * @return a configured reader builder for the specified format and object model + */ + public static ReadBuilder readBuilder( + FileFormat format, Class type, InputFile inputFile) { + FormatModel model = modelFor(format, type); + return model.readBuilder(inputFile); + } + + /** + * Returns a writer builder for generating a {@link DataFile}. + * + *

The returned builder produces a writer that accepts records defined by the specified object + * model and persists them using the provided file format. Unlike basic writers, this writer + * collects file metadata during the writing process and generates a {@link DataFile} that can be + * used for table operations. + * + * @param format the file format used for writing + * @param type the input type + * @param outputFile destination for the written data + * @param the type of data records the writer will accept + * @param the type of the input schema for the writer + * @return a configured data write builder for creating a {@link DataWriter} + */ + public static FileWriterBuilder, S> dataWriteBuilder( + FileFormat format, Class type, EncryptedOutputFile outputFile) { + FormatModel model = modelFor(format, type); + return FileWriterBuilderImpl.forDataFile(model, outputFile); + } + + /** + * Creates a writer builder for generating a {@link DeleteFile} with equality deletes. + * + *

The returned builder produces a writer that accepts records defined by the specified object + * model and persists them using the given file format. The writer persists equality delete + * records that identify rows to be deleted based on the configured equality fields, producing a + * {@link DeleteFile} that can be used for table operations. + * + * @param format the file format used for writing + * @param type the input type + * @param outputFile destination for the written data + * @param the type of data records the writer will accept + * @param the type of the input schema for the writer + * @return a configured delete write builder for creating an {@link EqualityDeleteWriter} + */ + public static FileWriterBuilder, S> equalityDeleteWriteBuilder( + FileFormat format, Class type, EncryptedOutputFile outputFile) { + FormatModel model = modelFor(format, type); + return FileWriterBuilderImpl.forEqualityDelete(model, outputFile); + } + + /** + * Creates a writer builder for generating a {@link DeleteFile} with position-based deletes. + * + *

The returned builder produces a writer that accepts records defined by the specified object + * model and persists them using the given file format. The writer accepts {@link PositionDelete} + * records that identify rows to be deleted by file path and position, producing a {@link + * DeleteFile} that can be used for table operations. + * + * @param format the file format used for writing + * @param outputFile destination for the written data + * @return a configured delete write builder for creating a {@link PositionDeleteWriter} + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static FileWriterBuilder, ?> positionDeleteWriteBuilder( + FileFormat format, EncryptedOutputFile outputFile) { + Class> deleteClass = + (Class>) (Class) PositionDelete.class; + FormatModel, ?> model = FormatModelRegistry.modelFor(format, deleteClass); + return FileWriterBuilderImpl.forPositionDelete(model, outputFile); + } + + @VisibleForTesting + static Map>, FormatModel> models() { + return MODELS; + } + + @SuppressWarnings("unchecked") + private static FormatModel modelFor(FileFormat format, Class type) { + FormatModel model = (FormatModel) MODELS.get(Pair.of(format, type)); + Preconditions.checkArgument( + model != null, "Format model is not registered for format %s and type %s", format, type); + return model; + } + + @SuppressWarnings("CatchBlockLogException") + private static void registerSupportedFormats() { + // Uses dynamic methods to call the `register` for the listed classes + for (String classToRegister : CLASSES_TO_REGISTER) { + try { + DynMethods.builder("register").impl(classToRegister).buildStaticChecked().invoke(); + } catch (NoSuchMethodException e) { + // failing to register a factory is normal and does not require a stack trace + LOG.info( + "Unable to call register for ({}). Check for missing jars on the classpath: {}", + classToRegister, + e.getMessage()); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/formats/ModelWriteBuilder.java b/core/src/main/java/org/apache/iceberg/formats/ModelWriteBuilder.java new file mode 100644 index 000000000000..1a65305b2a3a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/ModelWriteBuilder.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; + +/** + * Builder interface for creating file writers across supported data file formats. The {@link + * FormatModel} implementations provide the appropriate {@link ModelWriteBuilder} instances. + * + *

The {@link ModelWriteBuilder} follows the builder pattern to configure and create {@link + * FileAppender} instances that write data to the target output files. + * + *

This interface is directly exposed to users for parameterizing when only an appender is + * required. + * + * @param the output data type produced by the reader + * @param the type of the schema for the output data type + */ +public interface ModelWriteBuilder { + /** Set the file schema. */ + ModelWriteBuilder schema(Schema schema); + + /** + * Sets the engine's representation accepted by the writer. + * + *

Some data types require additional type information from the engine schema that cannot be + * fully expressed by the Iceberg schema or the data itself. For example, a variant type may use a + * shredded representation that relies on engine-specific metadata to map back to the Iceberg + * schema. + * + *

The engine schema must be aligned with the Iceberg schema, but may include representation + * details that Iceberg considers equivalent. + */ + ModelWriteBuilder engineSchema(S schema); + + /** + * Set a writer configuration property which affects the writer behavior. Writer builders should + * ignore configuration keys not known for them. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + ModelWriteBuilder set(String property, String value); + + /** + * Sets multiple writer configuration properties that affect the writer behavior. Writer builders + * should ignore configuration keys not known for them. + * + * @param properties writer config properties to set + * @return this for method chaining + */ + default ModelWriteBuilder setAll(Map properties) { + properties.forEach(this::set); + return this; + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + ModelWriteBuilder meta(String property, String value); + + /** + * Sets multiple file metadata properties in the created file. + * + * @param properties file metadata properties to set + * @return this for method chaining + */ + default ModelWriteBuilder meta(Map properties) { + properties.forEach(this::meta); + return this; + } + + /** + * Based on the target file content the generated {@link FileAppender} needs different + * configuration. + */ + ModelWriteBuilder content(FileContent content); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + ModelWriteBuilder metricsConfig(MetricsConfig metricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + ModelWriteBuilder overwrite(); + + /** + * Sets the encryption key used for writing the file. If the writer does not support encryption, + * then an exception should be thrown. + */ + ModelWriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey); + + /** + * Sets the additional authentication data (AAD) prefix used for writing the file. If the writer + * does not support encryption, then an exception should be thrown. + */ + ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix); + + /** Finalizes the configuration and builds the {@link FileAppender}. */ + FileAppender build() throws IOException; +} diff --git a/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java b/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java new file mode 100644 index 000000000000..2809750970a7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mapping.NameMapping; + +/** + * Builder interface for creating file readers across supported data file formats. The {@link + * FormatModel} implementations provides appropriate {@link ReadBuilder} instances + * + *

The {@link ReadBuilder} follows the builder pattern to configure and create {@link + * CloseableIterable} instances that read data from source files. Configuration options include + * schema projection, predicate filtering, record batching, and encryption settings. + * + *

This interface is directly exposed to users for parameterizing readers. + * + * @param the output data type produced by the reader + * @param the type of the schema for the output data type + */ +public interface ReadBuilder { + /** + * Restricts the read to the given range: [start, start + length). + * + * @param start the start position for this read + * @param length the length of the range this read should scan + */ + ReadBuilder split(long start, long length); + + /** Set the projection schema. This must be set before the reader is instantiated. */ + ReadBuilder project(Schema schema); + + /** + * Sets the engine's representation of the projected schema. + * + *

When provided, this schema should be consistent with the requested Iceberg projection, while + * allowing representation differences. Examples include: + * + *

    + *
  • using a {@code long} to represent an Iceberg {@code int} column, + *
  • requesting a shredded representation for a variant type, or + *
  • selecting specific concrete classes for Iceberg structs. + *
+ */ + ReadBuilder engineProjection(S schema); + + /** + * Configures whether filtering should be case-sensitive. If the reader supports filtering, it + * must respect this setting. The default value is true. + * + * @param caseSensitive indicates if filtering is case-sensitive + */ + ReadBuilder caseSensitive(boolean caseSensitive); + + /** + * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary + * records. Some readers may not support filtering, or may only support filtering for certain + * expressions. In this case the reader might return unfiltered or partially filtered rows. It is + * the caller's responsibility to apply the filter again. + * + * @param filter the filter to set + */ + ReadBuilder filter(Expression filter); + + /** + * Set a reader configuration property which affects the reader behavior. Reader builders should + * ignore configuration keys not known for them. + * + * @param key a reader config property name + * @param value config value + * @return this for method chaining + */ + ReadBuilder set(String key, String value); + + /** + * Sets multiple reader configuration properties that affect the reader behavior. Reader builders + * should ignore configuration keys not known for them. + * + * @param properties reader config properties to set + * @return this for method chaining + */ + default ReadBuilder setAll(Map properties) { + properties.forEach(this::set); + return this; + } + + /** Enables reusing the containers returned by the reader. Decreases pressure on GC. */ + ReadBuilder reuseContainers(); + + /** Sets the batch size for vectorized readers. */ + ReadBuilder recordsPerBatch(int rowsPerBatch); + + /** + * Contains the values in the result objects which are coming from metadata and not coming from + * the data files themselves. The keys of the map are the column ids, the values are the constant + * values to be used in the result. + */ + ReadBuilder idToConstant(Map idToConstant); + + /** Sets a mapping from external schema names to Iceberg type IDs. */ + ReadBuilder withNameMapping(NameMapping nameMapping); + + /** Builds the reader. */ + CloseableIterable build(); +} diff --git a/core/src/test/java/org/apache/iceberg/formats/TestFormatModelRegistry.java b/core/src/test/java/org/apache/iceberg/formats/TestFormatModelRegistry.java new file mode 100644 index 000000000000..fe7a4d96f612 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/formats/TestFormatModelRegistry.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.util.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestFormatModelRegistry { + + @BeforeEach + void clearRegistry() { + FormatModelRegistry.models().clear(); + } + + @Test + void testSuccessfulRegister() { + FormatModel model = new DummyParquetFormatModel(Object.class, Object.class); + FormatModelRegistry.register(model); + assertThat(FormatModelRegistry.models()) + .containsEntry(Pair.of(FileFormat.PARQUET, Object.class), model); + } + + /** Tests that registering the same class with the same configuration updates the registration. */ + @Test + void testRegistrationForDifferentType() { + FormatModel model1 = new DummyParquetFormatModel(Object.class, Object.class); + FormatModel model2 = new DummyParquetFormatModel(Long.class, Object.class); + FormatModelRegistry.register(model1); + assertThat(FormatModelRegistry.models().get(Pair.of(FileFormat.PARQUET, model1.type()))) + .isSameAs(model1); + + // Registering a new model with the different format will succeed + FormatModelRegistry.register(model2); + assertThat(FormatModelRegistry.models().get(Pair.of(FileFormat.PARQUET, model1.type()))) + .isSameAs(model1); + assertThat(FormatModelRegistry.models().get(Pair.of(FileFormat.PARQUET, model2.type()))) + .isSameAs(model2); + } + + /** + * Tests that registering different classes, or different schema type for the same file format and + * type is failing. + */ + @Test + void testFailingReRegistrations() { + FormatModel model = new DummyParquetFormatModel(Object.class, Object.class); + FormatModelRegistry.register(model); + assertThat(FormatModelRegistry.models()) + .containsEntry(Pair.of(FileFormat.PARQUET, Object.class), model); + + // Registering a new model with different schema type should fail + assertThatThrownBy( + () -> + FormatModelRegistry.register( + new DummyParquetFormatModel(Object.class, String.class))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot register class"); + + // Registering a new model with null schema type should fail + assertThatThrownBy( + () -> FormatModelRegistry.register(new DummyParquetFormatModel(Object.class, null))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot register class"); + } + + private static class DummyParquetFormatModel implements FormatModel { + private final Class type; + private final Class schemaType; + + private DummyParquetFormatModel(Class type, Class schemaType) { + this.type = type; + this.schemaType = schemaType; + } + + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + @SuppressWarnings("unchecked") + public Class type() { + return (Class) type; + } + + @Override + @SuppressWarnings("unchecked") + public Class schemaType() { + return (Class) schemaType; + } + + @Override + public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return null; + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return null; + } + } +}