Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b33aec4
Core, Data: File Format API interfaces
pvary Apr 11, 2025
5b79dbd
Renjie's comments
pvary Apr 29, 2025
a1daced
registerObjectModel exception handling fix
pvary Apr 30, 2025
79d7703
Removing the need for data.AppenderBuilder
pvary May 6, 2025
8404aa7
Fixed Renjie findings
pvary May 7, 2025
a00540d
Cosmentic changes
pvary May 15, 2025
2a4816a
Steven's comments
pvary May 19, 2025
ba16b59
ObjectModelRegistry->FileAccessFactory, and AppenederBuilder->WriteBu…
pvary May 20, 2025
9976bfb
Review comments by Steven and Russell (some javadoc, and a few method…
pvary May 21, 2025
62ea041
Added generic for the input/output of the reader/writer - like 'FileA…
pvary May 22, 2025
22ca3d7
New classes for implementation allows for absolutely new interfaces
pvary May 26, 2025
598f46d
Default methods for setting multiple config/meta values
pvary May 28, 2025
5ce49dc
Return FileReader instead of CloseableIterable from the ReadBuilder
pvary Jun 16, 2025
46a1742
Revert "Return FileReader instead of CloseableIterable from the ReadB…
pvary Jun 24, 2025
026e5e9
Ryan's comments
pvary Jun 24, 2025
3149874
Move interface classes to core
pvary Jun 25, 2025
4659adf
Rename FileAccessFactory to ObjectModelFactory
pvary Jun 25, 2025
40ec8bb
Ryan's next round of comments
pvary Jun 27, 2025
f20bb4e
Separate input conversion from witers
pvary Jul 22, 2025
7e91a40
Eduard's comments
pvary Jul 24, 2025
a957c47
Fixing parameter names
pvary Jul 24, 2025
8ce2f2e
Ryan's comments
pvary Aug 6, 2025
2b1b10b
Remove builder parameter from data file writers
pvary Aug 6, 2025
26e03b7
Remove parametrization as much as possible
pvary Aug 13, 2025
ef41daa
ContentFileWriteBuilder needs a generic parameter
pvary Aug 14, 2025
acb2254
Revert transformers, and used engine specific type setting for writer…
pvary Aug 25, 2025
3efd188
Steven's and Russel's comments
pvary Sep 11, 2025
e5611f4
Move to writeBuilder/positionDeleteWriteBuilder solution, and depreca…
pvary Sep 15, 2025
3a6a5ed
Use a specific FormatModel to write PositionDeletes
pvary Sep 24, 2025
fad5e07
Changes discussed on the sync
pvary Oct 2, 2025
790e282
Steven's comments
pvary Oct 8, 2025
224070e
Remove the FormatModelRegistry.writeBuilder method
pvary Oct 13, 2025
0c439c1
Eduard's comment
pvary Oct 17, 2025
ddcf866
Addressing Amogh's, Steven's and Gabor's comments
pvary Oct 20, 2025
2fa0c5f
Removing ReadBuilder.outputSchema per Steven's and Renjie's comments
pvary Oct 21, 2025
8f58a9e
Aihua's comments
pvary Oct 26, 2025
b22a91a
Move back to parametrized writers
pvary Nov 5, 2025
8085a4b
Encryption handling
pvary Nov 5, 2025
af4f8a3
Revert back to OutputFile in the FormatModel
pvary Nov 8, 2025
c17c9ca
Synchronized register method, and only provide writeBuilder, location…
pvary Nov 9, 2025
7c175ea
constantValues -> idToConstant
pvary Nov 9, 2025
7ed56a5
Remove new from the API parameter names
pvary Nov 9, 2025
69461a2
Move to EncryptedOutputFile from OutputFile FormatModel.writeBuilder
pvary Nov 10, 2025
fdbb6c0
Ryan's comments
pvary Nov 11, 2025
4ae6ba0
Fix typo in comment
pvary Nov 20, 2025
fe743e1
Revert back to prevent re-registering models
pvary Nov 20, 2025
b66e891
Ryan's comments
pvary Nov 21, 2025
d450ec6
Steven's comment to change the log message for failed registration
pvary Nov 21, 2025
564ba8c
Create a marker class for the Comet reader and a few extra nits
pvary Dec 9, 2025
17f030f
Added back the ReadBuilder.outputSchema method as the attirbute is us…
pvary Jan 19, 2026
74ca9f7
Changes from #12298 based on Ryan's comments
pvary Feb 5, 2026
6cf720a
Last fixes
pvary Feb 6, 2026
7f6cb33
Javadoc comment fixes
pvary Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De
private static final Set<Integer> FILE_AND_POS_FIELD_IDS =
ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId());

private final FileAppender<StructLike> appender;
private final FileAppender<PositionDelete<T>> appender;
private final FileFormat format;
private final String location;
private final PartitionSpec spec;
Expand All @@ -59,14 +59,22 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De
private final CharSequenceSet referencedDataFiles;
private DeleteFile deleteFile = null;

/**
* Creates a new position delete writer.
*
* @deprecated since 1.11.0, will be updated in 1.12.0 to accept {@code
* FileAppender<PositionDelete<T>>} instead of {@code FileAppender<? extends StructLike>}.
*/
@Deprecated
@SuppressWarnings("unchecked")
public PositionDeleteWriter(
FileAppender<StructLike> appender,
FileAppender<? extends StructLike> appender,
FileFormat format,
String location,
PartitionSpec spec,
StructLike partition,
EncryptionKeyMetadata keyMetadata) {
this.appender = appender;
this.appender = (FileAppender<PositionDelete<T>>) appender;
this.format = format;
this.location = location;
this.spec = spec;
Expand Down
132 changes: 132 additions & 0 deletions core/src/main/java/org/apache/iceberg/formats/BaseFormatModel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.formats;

import java.util.Map;
import org.apache.iceberg.Schema;

/**
* Base implementation of {@link FormatModel} that provides common functionality for format models.
*
* <p>This abstract class serves as a foundation for creating format-specific models that handle
* reading and writing data in various file formats.
*
* @param <D> output type used for reading data, and input type for writing data and deletes
* @param <S> the engine-specific schema type describing the input/output data
* @param <W> the file format specific writer type produced by the writer function
* @param <R> the file format specific reader type produced by the reader function
* @param <F> the file schema type used by the underlying file format
*/
public abstract class BaseFormatModel<D, S, W, R, F> implements FormatModel<D, S> {
private final Class<? extends D> type;
private final Class<S> schemaType;
private final WriterFunction<W, S, F> writerFunction;
private final ReaderFunction<R, S, F> readerFunction;

/**
* Constructs a new BaseFormatModel with the specified configuration.
*
* @param type the row type class for the object model implementation processed by this factory.
* @param schemaType the schema type class for the object model implementation processed by this
* factory.
* @param writerFunction the function used to create writers for this format
* @param readerFunction the function used to create readers for this format
*/
protected BaseFormatModel(
Class<? extends D> type,
Class<S> schemaType,
WriterFunction<W, S, F> writerFunction,
ReaderFunction<R, S, F> readerFunction) {
this.type = type;
this.schemaType = schemaType;
this.writerFunction = writerFunction;
this.readerFunction = readerFunction;
}

@Override
public Class<? extends D> type() {
return type;
}

@Override
public Class<S> schemaType() {
return schemaType;
}

/**
* Returns the writer function used to create writers for this format.
*
* @return the writer function
*/
protected WriterFunction<W, S, F> writerFunction() {
return writerFunction;
}

/**
* Returns the reader function used to create readers for this format.
*
* @return the reader function
*/
protected ReaderFunction<R, S, F> readerFunction() {
return readerFunction;
}

/**
* A functional interface for creating writers that can write data in a specific format.
*
* @param <W> the file format specific writer type to be created
* @param <S> the engine-specific schema type describing the input data
* @param <F> the file schema type used by the underlying file format
*/
@FunctionalInterface
public interface WriterFunction<W, S, F> {
/**
* Creates a writer for the given schemas.
*
* @param icebergSchema the Iceberg schema defining the table structure
* @param fileSchema the file format specific target schema for the output files
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but I want to note it somewhere: the file schema will be converted from the Iceberg Schema and will directly match in structure, names, and equivalent types. We should probably document this and make sure that all of the format builders follow that rule.

Similarly, we should also think about guarantees and/or requirements of the engine schema. I think this is dependent on the engine, though. If an engine builds its integration so that names must match or positions must match, that's up to the engine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should add it here too, or it is enough as we describe it at the writer and the reader?

  • FileWriterBuilder.engineSchema
   * <p>The engine schema must be aligned with the Iceberg schema, but may include representation
   * details that Iceberg considers equivalent.
  • ReadBuilder.engineProjection
   * <p>When provided, this schema should be consistent with the requested Iceberg projection, while
   * allowing representation differences. Examples include:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should state how the file schema is derived because it is always a direct translation from the Iceberg schema and the structure and names match.

For the engine schema, I think mentioning that it is engine-specific is the right thing to do.

* @param engineSchema the engine-specific schema for the input data (optional)
* @return a writer configured for the given schemas
*/
W write(Schema icebergSchema, F fileSchema, S engineSchema);
}

/**
* A functional interface for creating readers that can read data from a specific format.
*
* @param <R> the file format specific reader type to be created
* @param <S> the engine-specific schema type describing the output data
* @param <F> the file schema type used by the underlying file format
*/
@FunctionalInterface
public interface ReaderFunction<R, S, F> {
/**
* Creates a reader for the given schemas.
*
* @param icebergSchema the Iceberg schema defining the table structure
* @param fileSchema the schema that the file was written with, although it can be <code>null
* </code> in the case of Avro files because it is passed in later
* @param engineSchema the engine-specific schema for the output data (optional)
* @param idToConstant a map of field IDs to constant values for partition columns and other
* fields not stored in data files
* @return a reader configured for the given schemas
*/
R read(Schema icebergSchema, F fileSchema, S engineSchema, Map<Integer, ?> idToConstant);
}
}
159 changes: 159 additions & 0 deletions core/src/main/java/org/apache/iceberg/formats/FileWriterBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.formats;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.util.ArrayUtil;

/**
* A generic builder interface for creating specialized file writers in the Iceberg ecosystem.
*
* <p>This builder provides a unified configuration API for generating various types of content
* writers:
*
* <ul>
* <li>{@link DataWriter} for creating data files with table records
* <li>{@link EqualityDeleteWriter} for creating files with equality-based delete records
* <li>{@link PositionDeleteWriter} for creating files with position-based delete records
* </ul>
*
* <p>Each concrete implementation configures the underlying file format writer while adding
* content-specific metadata and behaviors.
*
* @param <W> the concrete writer type the builder produces
* @param <S> the type of the engine schema for the input data
*/
public interface FileWriterBuilder<W extends FileWriter<?, ?>, S> {

/**
* Set a writer configuration property which affects the writer behavior.
*
* @param property a writer config property name
* @param value config value
* @return this for method chaining
*/
FileWriterBuilder<W, S> set(String property, String value);

/**
* Adds the new properties to the writer configuration.
*
* @param properties a map of writer config properties
* @return this for method chaining
*/
default FileWriterBuilder<W, S> setAll(Map<String, String> properties) {
properties.forEach(this::set);
return this;
}

/**
* Set a file metadata property in the created file.
*
* @param property a file metadata property name
* @param value config value
* @return this for method chaining
*/
FileWriterBuilder<W, S> meta(String property, String value);

/**
* Add the new properties to file metadata for the created file.
*
* @param properties a map of file metadata properties
* @return this for method chaining
*/
default FileWriterBuilder<W, S> meta(Map<String, String> properties) {
properties.forEach(this::meta);
return this;
}

/** Sets the metrics configuration used for collecting column metrics for the created file. */
FileWriterBuilder<W, S> metricsConfig(MetricsConfig metricsConfig);

/** Overwrite the file if it already exists. By default, overwrite is disabled. */
FileWriterBuilder<W, S> overwrite();

/**
* Sets the encryption key used for writing the file. If the writer does not support encryption,
* then an exception should be thrown.
*/
FileWriterBuilder<W, S> withFileEncryptionKey(ByteBuffer encryptionKey);

/**
* Sets the additional authentication data (AAD) prefix used for writing the file. If the writer
* does not support encryption, then an exception should be thrown.
*/
FileWriterBuilder<W, S> withAADPrefix(ByteBuffer aadPrefix);

/** Sets the partition specification for the Iceberg metadata. */
FileWriterBuilder<W, S> spec(PartitionSpec newSpec);

/** Sets the partition value for the Iceberg metadata. */
FileWriterBuilder<W, S> partition(StructLike partition);

/** Sets the encryption key metadata for Iceberg metadata. */
FileWriterBuilder<W, S> keyMetadata(EncryptionKeyMetadata keyMetadata);

/** Sets the sort order for the Iceberg metadata. */
FileWriterBuilder<W, S> sortOrder(SortOrder sortOrder);

/** Set the file schema. */
FileWriterBuilder<W, S> schema(Schema schema);

/**
* Sets the engine-specific schema that describes records accepted by the writer.
*
* <p>Some data types require additional type information from the engine schema that cannot be
* fully expressed by the Iceberg schema or the data itself. For example, an engine's tinyint or
* smallint types are mapped to Iceberg's integer type, but the writer may need the original type
* for proper serialization. Similarly, a variant type may use a shredded representation that
* relies on engine-specific metadata to map back to the Iceberg schema.
*
* <p>The engine schema must be aligned with the Iceberg schema, but may include representation
* details that Iceberg considers equivalent.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good way to state the requirement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it enough here, or shall we add it somewhere else too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine here. Engine schema is really a contract between the engine and its registered object model.

*/
FileWriterBuilder<W, S> engineSchema(S schema);

/**
* Sets the equality field ids for the equality delete writer. Only applicable when building an
* {@link EqualityDeleteWriter}.
*/
default FileWriterBuilder<W, S> equalityFieldIds(List<Integer> fieldIds) {
return equalityFieldIds(ArrayUtil.toIntArray(fieldIds));
}

/**
* Sets the equality field ids for the equality delete writer. Only applicable when building an
* {@link EqualityDeleteWriter}.
*/
FileWriterBuilder<W, S> equalityFieldIds(int... fieldIds);

W build() throws IOException;
}
Loading