-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Data: Moving GenericFileWriterFactory to the new FormatModel API #15334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
39f5a8e
91c331f
05c4db7
a8c8500
9c3d144
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -22,21 +22,37 @@ | |||||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; | ||||||
| import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; | ||||||
|
|
||||||
| import java.io.IOException; | ||||||
| import java.io.UncheckedIOException; | ||||||
| import java.util.Map; | ||||||
| import org.apache.iceberg.FileFormat; | ||||||
| import org.apache.iceberg.MetricsConfig; | ||||||
| import org.apache.iceberg.PartitionSpec; | ||||||
| import org.apache.iceberg.Schema; | ||||||
| import org.apache.iceberg.SortOrder; | ||||||
| import org.apache.iceberg.StructLike; | ||||||
| import org.apache.iceberg.Table; | ||||||
| import org.apache.iceberg.avro.Avro; | ||||||
| import org.apache.iceberg.data.avro.DataWriter; | ||||||
| import org.apache.iceberg.data.orc.GenericOrcWriter; | ||||||
| import org.apache.iceberg.data.parquet.GenericParquetWriter; | ||||||
| import org.apache.iceberg.deletes.PositionDeleteWriter; | ||||||
| import org.apache.iceberg.encryption.EncryptedOutputFile; | ||||||
| import org.apache.iceberg.formats.FormatModelRegistry; | ||||||
| import org.apache.iceberg.orc.ORC; | ||||||
| import org.apache.iceberg.parquet.Parquet; | ||||||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||||||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||||||
| import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | ||||||
|
|
||||||
| public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> { | ||||||
| public class GenericFileWriterFactory extends RegistryBasedFileWriterFactory<Record, Schema> { | ||||||
| private static final Logger LOG = LoggerFactory.getLogger(GenericFileWriterFactory.class); | ||||||
|
|
||||||
| private Table table; | ||||||
| private FileFormat format; | ||||||
| private Schema positionDeleteRowSchema; | ||||||
| private Map<String, String> writerProperties; | ||||||
|
|
||||||
| GenericFileWriterFactory( | ||||||
| Table table, | ||||||
|
|
@@ -50,13 +66,16 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> { | |||||
| super( | ||||||
| table, | ||||||
| dataFileFormat, | ||||||
| Record.class, | ||||||
| dataSchema, | ||||||
| dataSortOrder, | ||||||
| deleteFileFormat, | ||||||
| equalityFieldIds, | ||||||
| equalityDeleteRowSchema, | ||||||
| equalityDeleteSortOrder, | ||||||
| ImmutableMap.of()); | ||||||
| ImmutableMap.of(), | ||||||
| null, | ||||||
| null); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
|
|
@@ -80,14 +99,20 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> { | |||||
| super( | ||||||
| table, | ||||||
| dataFileFormat, | ||||||
| Record.class, | ||||||
| dataSchema, | ||||||
| dataSortOrder, | ||||||
| deleteFileFormat, | ||||||
| equalityFieldIds, | ||||||
| equalityDeleteRowSchema, | ||||||
| equalityDeleteSortOrder, | ||||||
| positionDeleteRowSchema, | ||||||
| writerProperties); | ||||||
| writerProperties, | ||||||
| null, | ||||||
| null); | ||||||
| this.table = table; | ||||||
| this.format = dataFileFormat; | ||||||
| this.positionDeleteRowSchema = positionDeleteRowSchema; | ||||||
| this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of(); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
|
|
@@ -107,64 +132,168 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> { | |||||
| super( | ||||||
| table, | ||||||
| dataFileFormat, | ||||||
| Record.class, | ||||||
| dataSchema, | ||||||
| dataSortOrder, | ||||||
| deleteFileFormat, | ||||||
| equalityFieldIds, | ||||||
| equalityDeleteRowSchema, | ||||||
| equalityDeleteSortOrder, | ||||||
| positionDeleteRowSchema); | ||||||
| ImmutableMap.of(), | ||||||
| dataSchema, | ||||||
| equalityDeleteRowSchema); | ||||||
| this.table = table; | ||||||
| this.format = dataFileFormat; | ||||||
| this.positionDeleteRowSchema = positionDeleteRowSchema; | ||||||
| } | ||||||
|
|
||||||
| static Builder builderFor(Table table) { | ||||||
| return new Builder(table); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configureDataWrite(Avro.DataWriteBuilder builder) { | ||||||
| builder.createWriterFunc(DataWriter::create); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { | ||||||
| builder.createWriterFunc(DataWriter::create); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { | ||||||
| builder.createWriterFunc(DataWriter::create); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configureDataWrite(Parquet.DataWriteBuilder builder) { | ||||||
| builder.createWriterFunc(GenericParquetWriter::create); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { | ||||||
| builder.createWriterFunc(GenericParquetWriter::create); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { | ||||||
| builder.createWriterFunc(GenericParquetWriter::create); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configureDataWrite(ORC.DataWriteBuilder builder) { | ||||||
| builder.createWriterFunc(GenericOrcWriter::buildWriter); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { | ||||||
| builder.createWriterFunc(GenericOrcWriter::buildWriter); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| /** | ||||||
| * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as | ||||||
| * the configuration is done by the {@link FormatModelRegistry}. | ||||||
| */ | ||||||
| @Deprecated | ||||||
| protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { | ||||||
| builder.createWriterFunc(GenericOrcWriter::buildWriter); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public PositionDeleteWriter<Record> newPositionDeleteWriter( | ||||||
| EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { | ||||||
| if (positionDeleteRowSchema == null) { | ||||||
| return super.newPositionDeleteWriter(file, spec, partition); | ||||||
| } else { | ||||||
| LOG.warn( | ||||||
| "Deprecated feature used. Position delete row schema is used to create the position delete writer."); | ||||||
| Map<String, String> properties = table == null ? ImmutableMap.of() : table.properties(); | ||||||
| MetricsConfig metricsConfig = | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this part of the code actually mimic
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I originally misunderstood this comment. |
||||||
| table == null | ||||||
| ? MetricsConfig.forPositionDelete() | ||||||
| : MetricsConfig.forPositionDelete(table); | ||||||
|
|
||||||
| try { | ||||||
| return switch (format) { | ||||||
| case AVRO -> | ||||||
| Avro.writeDeletes(file) | ||||||
| .setAll(properties) | ||||||
| .setAll(writerProperties) | ||||||
| .metricsConfig(metricsConfig) | ||||||
| .createWriterFunc(DataWriter::create) | ||||||
| .withPartition(partition) | ||||||
| .overwrite() | ||||||
| .rowSchema(positionDeleteRowSchema) | ||||||
| .withSpec(spec) | ||||||
| .withKeyMetadata(file.keyMetadata()) | ||||||
| .buildPositionWriter(); | ||||||
| case ORC -> | ||||||
| ORC.writeDeletes(file) | ||||||
| .setAll(properties) | ||||||
| .setAll(writerProperties) | ||||||
| .metricsConfig(metricsConfig) | ||||||
| .createWriterFunc(GenericOrcWriter::buildWriter) | ||||||
| .withPartition(partition) | ||||||
| .overwrite() | ||||||
| .rowSchema(positionDeleteRowSchema) | ||||||
| .withSpec(spec) | ||||||
| .withKeyMetadata(file.keyMetadata()) | ||||||
| .buildPositionWriter(); | ||||||
| case PARQUET -> | ||||||
| Parquet.writeDeletes(file) | ||||||
| .setAll(properties) | ||||||
| .setAll(writerProperties) | ||||||
| .metricsConfig(metricsConfig) | ||||||
| .createWriterFunc(GenericParquetWriter::create) | ||||||
| .withPartition(partition) | ||||||
| .overwrite() | ||||||
| .rowSchema(positionDeleteRowSchema) | ||||||
| .withSpec(spec) | ||||||
| .withKeyMetadata(file.keyMetadata()) | ||||||
| .buildPositionWriter(); | ||||||
| default -> | ||||||
| throw new UnsupportedOperationException( | ||||||
| "Cannot write pos-deletes for unsupported file format: " + format); | ||||||
| }; | ||||||
| } catch (IOException e) { | ||||||
| throw new UncheckedIOException("Failed to create new position delete writer", e); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| public static class Builder { | ||||||
| private final Table table; | ||||||
| private FileFormat dataFileFormat; | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still true? i noticed its missing in the new deprecation message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the deprecation comment here because this method was introduced in the current release cycle, before we decided to deprecate the entire class. Now that the class itself is deprecated, and the replacement class doesn’t provide a way to set the position‑delete row‑data schema, the constuctor is effectively deprecated as well. However, since there’s no new equivalent method, we don’t really have a suitable place to add a dedicated deprecation message.