-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Core, Data: File Format API interfaces #12774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b33aec4
5b79dbd
a1daced
79d7703
8404aa7
a00540d
2a4816a
ba16b59
9976bfb
62ea041
22ca3d7
598f46d
5ce49dc
46a1742
026e5e9
3149874
4659adf
40ec8bb
f20bb4e
7e91a40
a957c47
8ce2f2e
2b1b10b
26e03b7
ef41daa
acb2254
3efd188
e5611f4
3a6a5ed
fad5e07
790e282
224070e
0c439c1
ddcf866
2fa0c5f
8f58a9e
b22a91a
8085a4b
af4f8a3
c17c9ca
7c175ea
7ed56a5
69461a2
fdbb6c0
4ae6ba0
fe743e1
b66e891
d450ec6
564ba8c
17f030f
74ca9f7
6cf720a
7f6cb33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.formats; | ||
|
|
||
| import java.util.Map; | ||
| import org.apache.iceberg.Schema; | ||
|
|
||
| /** | ||
| * Base implementation of {@link FormatModel} that provides common functionality for format models. | ||
| * | ||
| * <p>This abstract class serves as a foundation for creating format-specific models that handle | ||
| * reading and writing data in various file formats. | ||
| * | ||
| * @param <D> output type used for reading data, and input type for writing data and deletes | ||
| * @param <S> the engine-specific schema type describing the input/output data | ||
| * @param <W> the file format specific writer type produced by the writer function | ||
| * @param <R> the file format specific reader type produced by the reader function | ||
| * @param <F> the file schema type used by the underlying file format | ||
| */ | ||
| public abstract class BaseFormatModel<D, S, W, R, F> implements FormatModel<D, S> { | ||
| private final Class<? extends D> type; | ||
| private final Class<S> schemaType; | ||
| private final WriterFunction<W, S, F> writerFunction; | ||
| private final ReaderFunction<R, S, F> readerFunction; | ||
|
|
||
| /** | ||
| * Constructs a new BaseFormatModel with the specified configuration. | ||
| * | ||
| * @param type the row type class for the object model implementation processed by this factory. | ||
| * @param schemaType the schema type class for the object model implementation processed by this | ||
| * factory. | ||
| * @param writerFunction the function used to create writers for this format | ||
| * @param readerFunction the function used to create readers for this format | ||
| */ | ||
| protected BaseFormatModel( | ||
| Class<? extends D> type, | ||
| Class<S> schemaType, | ||
| WriterFunction<W, S, F> writerFunction, | ||
| ReaderFunction<R, S, F> readerFunction) { | ||
| this.type = type; | ||
| this.schemaType = schemaType; | ||
| this.writerFunction = writerFunction; | ||
| this.readerFunction = readerFunction; | ||
| } | ||
|
|
||
| @Override | ||
| public Class<? extends D> type() { | ||
| return type; | ||
| } | ||
|
|
||
| @Override | ||
| public Class<S> schemaType() { | ||
| return schemaType; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the writer function used to create writers for this format. | ||
| * | ||
| * @return the writer function | ||
| */ | ||
| protected WriterFunction<W, S, F> writerFunction() { | ||
| return writerFunction; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the reader function used to create readers for this format. | ||
| * | ||
| * @return the reader function | ||
| */ | ||
| protected ReaderFunction<R, S, F> readerFunction() { | ||
| return readerFunction; | ||
| } | ||
|
|
||
| /** | ||
| * A functional interface for creating writers that can write data in a specific format. | ||
| * | ||
| * @param <W> the file format specific writer type to be created | ||
| * @param <S> the engine-specific schema type describing the input data | ||
| * @param <F> the file schema type used by the underlying file format | ||
| */ | ||
| @FunctionalInterface | ||
| public interface WriterFunction<W, S, F> { | ||
| /** | ||
| * Creates a writer for the given schemas. | ||
| * | ||
| * @param icebergSchema the Iceberg schema defining the table structure | ||
| * @param fileSchema the file format specific target schema for the output files | ||
| * @param engineSchema the engine-specific schema for the input data (optional) | ||
| * @return a writer configured for the given schemas | ||
| */ | ||
| W write(Schema icebergSchema, F fileSchema, S engineSchema); | ||
| } | ||
|
|
||
| /** | ||
| * A functional interface for creating readers that can read data from a specific format. | ||
| * | ||
| * @param <R> the file format specific reader type to be created | ||
| * @param <S> the engine-specific schema type describing the output data | ||
| * @param <F> the file schema type used by the underlying file format | ||
| */ | ||
| @FunctionalInterface | ||
| public interface ReaderFunction<R, S, F> { | ||
| /** | ||
| * Creates a reader for the given schemas. | ||
| * | ||
| * @param icebergSchema the Iceberg schema defining the table structure | ||
| * @param fileSchema the schema that the file was written with, although it can be <code>null | ||
| * </code> in the case of Avro files because it is passed in later | ||
| * @param engineSchema the engine-specific schema for the output data (optional) | ||
| * @param idToConstant a map of field IDs to constant values for partition columns and other | ||
| * fields not stored in data files | ||
| * @return a reader configured for the given schemas | ||
| */ | ||
| R read(Schema icebergSchema, F fileSchema, S engineSchema, Map<Integer, ?> idToConstant); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.formats; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.deletes.EqualityDeleteWriter; | ||
| import org.apache.iceberg.deletes.PositionDeleteWriter; | ||
| import org.apache.iceberg.encryption.EncryptionKeyMetadata; | ||
| import org.apache.iceberg.io.DataWriter; | ||
| import org.apache.iceberg.io.FileWriter; | ||
| import org.apache.iceberg.util.ArrayUtil; | ||
|
|
||
| /** | ||
| * A generic builder interface for creating specialized file writers in the Iceberg ecosystem. | ||
| * | ||
| * <p>This builder provides a unified configuration API for generating various types of content | ||
| * writers: | ||
| * | ||
| * <ul> | ||
| * <li>{@link DataWriter} for creating data files with table records | ||
| * <li>{@link EqualityDeleteWriter} for creating files with equality-based delete records | ||
| * <li>{@link PositionDeleteWriter} for creating files with position-based delete records | ||
| * </ul> | ||
| * | ||
| * <p>Each concrete implementation configures the underlying file format writer while adding | ||
| * content-specific metadata and behaviors. | ||
| * | ||
| * @param <W> the concrete writer type the builder produces | ||
| * @param <S> the type of the engine schema for the input data | ||
| */ | ||
| public interface FileWriterBuilder<W extends FileWriter<?, ?>, S> { | ||
|
|
||
| /** | ||
| * Set a writer configuration property which affects the writer behavior. | ||
| * | ||
| * @param property a writer config property name | ||
| * @param value config value | ||
| * @return this for method chaining | ||
| */ | ||
| FileWriterBuilder<W, S> set(String property, String value); | ||
|
|
||
| /** | ||
| * Adds the new properties to the writer configuration. | ||
| * | ||
| * @param properties a map of writer config properties | ||
| * @return this for method chaining | ||
| */ | ||
| default FileWriterBuilder<W, S> setAll(Map<String, String> properties) { | ||
| properties.forEach(this::set); | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Set a file metadata property in the created file. | ||
| * | ||
| * @param property a file metadata property name | ||
| * @param value config value | ||
| * @return this for method chaining | ||
| */ | ||
| FileWriterBuilder<W, S> meta(String property, String value); | ||
|
|
||
| /** | ||
| * Add the new properties to file metadata for the created file. | ||
| * | ||
| * @param properties a map of file metadata properties | ||
| * @return this for method chaining | ||
| */ | ||
| default FileWriterBuilder<W, S> meta(Map<String, String> properties) { | ||
| properties.forEach(this::meta); | ||
| return this; | ||
| } | ||
|
|
||
| /** Sets the metrics configuration used for collecting column metrics for the created file. */ | ||
| FileWriterBuilder<W, S> metricsConfig(MetricsConfig metricsConfig); | ||
|
|
||
| /** Overwrite the file if it already exists. By default, overwrite is disabled. */ | ||
| FileWriterBuilder<W, S> overwrite(); | ||
|
|
||
| /** | ||
| * Sets the encryption key used for writing the file. If the writer does not support encryption, | ||
| * then an exception should be thrown. | ||
| */ | ||
| FileWriterBuilder<W, S> withFileEncryptionKey(ByteBuffer encryptionKey); | ||
|
|
||
| /** | ||
| * Sets the additional authentication data (AAD) prefix used for writing the file. If the writer | ||
| * does not support encryption, then an exception should be thrown. | ||
| */ | ||
| FileWriterBuilder<W, S> withAADPrefix(ByteBuffer aadPrefix); | ||
|
|
||
| /** Sets the partition specification for the Iceberg metadata. */ | ||
| FileWriterBuilder<W, S> spec(PartitionSpec newSpec); | ||
|
|
||
| /** Sets the partition value for the Iceberg metadata. */ | ||
| FileWriterBuilder<W, S> partition(StructLike partition); | ||
|
|
||
| /** Sets the encryption key metadata for Iceberg metadata. */ | ||
| FileWriterBuilder<W, S> keyMetadata(EncryptionKeyMetadata keyMetadata); | ||
|
|
||
| /** Sets the sort order for the Iceberg metadata. */ | ||
| FileWriterBuilder<W, S> sortOrder(SortOrder sortOrder); | ||
|
|
||
| /** Set the file schema. */ | ||
| FileWriterBuilder<W, S> schema(Schema schema); | ||
|
|
||
| /** | ||
| * Sets the engine-specific schema that describes records accepted by the writer. | ||
| * | ||
| * <p>Some data types require additional type information from the engine schema that cannot be | ||
| * fully expressed by the Iceberg schema or the data itself. For example, an engine's tinyint or | ||
| * smallint types are mapped to Iceberg's integer type, but the writer may need the original type | ||
| * for proper serialization. Similarly, a variant type may use a shredded representation that | ||
| * relies on engine-specific metadata to map back to the Iceberg schema. | ||
| * | ||
| * <p>The engine schema must be aligned with the Iceberg schema, but may include representation | ||
| * details that Iceberg considers equivalent. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good way to state the requirement.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it enough here, or shall we add it somewhere else too?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine here. Engine schema is really a contract between the engine and its registered object model. |
||
| */ | ||
| FileWriterBuilder<W, S> engineSchema(S schema); | ||
|
|
||
| /** | ||
| * Sets the equality field ids for the equality delete writer. Only applicable when building an | ||
| * {@link EqualityDeleteWriter}. | ||
| */ | ||
| default FileWriterBuilder<W, S> equalityFieldIds(List<Integer> fieldIds) { | ||
| return equalityFieldIds(ArrayUtil.toIntArray(fieldIds)); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the equality field ids for the equality delete writer. Only applicable when building an | ||
| * {@link EqualityDeleteWriter}. | ||
| */ | ||
| FileWriterBuilder<W, S> equalityFieldIds(int... fieldIds); | ||
|
|
||
| W build() throws IOException; | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker, but I want to note it somewhere: the file schema will be converted from the Iceberg Schema and will directly match in structure, names, and equivalent types. We should probably document this and make sure that all of the format builders follow that rule.
Similarly, we should also think about guarantees and/or requirements of the engine schema. I think this is dependent on the engine, though. If an engine builds its integration so that names must match or positions must match, that's up to the engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should add it here too, or it is enough as we describe it at the writer and the reader?
FileWriterBuilder.engineSchemaReadBuilder.engineProjectionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should state how the file schema is derived because it is always a direct translation from the Iceberg schema and the structure and names match.
For the engine schema, I think mentioning that it is engine-specific is the right thing to do.