diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 55f3b5701e0b..444c0d0226bd 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -40,7 +40,13 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -/** A base writer factory to be extended by query engine integrations. */ +/** + * A base writer factory to be extended by query engine integrations. + * + * @deprecated since version 1.11.0 and will be removed in 1.12.0. Use {@link + * RegistryBasedFileWriterFactory} + */ +@Deprecated public abstract class BaseFileWriterFactory implements FileWriterFactory, Serializable { private final Table table; private final FileFormat dataFileFormat; @@ -75,13 +81,6 @@ protected BaseFileWriterFactory( this.positionDeleteRowSchema = null; } - /** - * @deprecated This constructor is deprecated as of version 1.11.0 and will be removed in 1.12.0. - * Position deletes that include row data are no longer supported. Use {@link - * #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder, FileFormat, int[], Schema, - * SortOrder, Map)} instead. - */ - @Deprecated protected BaseFileWriterFactory( Table table, FileFormat dataFileFormat, diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java index e6872cc6e136..1e75b9eda961 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java @@ -22,21 +22,37 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class GenericFileWriterFactory extends BaseFileWriterFactory { +public class GenericFileWriterFactory extends RegistryBasedFileWriterFactory { + private static final Logger LOG = LoggerFactory.getLogger(GenericFileWriterFactory.class); + + private Table table; + private FileFormat format; + private Schema positionDeleteRowSchema; + private Map writerProperties; GenericFileWriterFactory( Table table, @@ -50,13 +66,16 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + Record.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - ImmutableMap.of()); + ImmutableMap.of(), + null, + null); } /** @@ -80,14 +99,20 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + Record.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema, - writerProperties); + writerProperties, + null, + null); + this.table = table; + this.format = dataFileFormat; + this.positionDeleteRowSchema = positionDeleteRowSchema; + this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of(); } /** @@ -107,64 +132,168 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory { super( table, dataFileFormat, + Record.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); + ImmutableMap.of(), + dataSchema, + equalityDeleteRowSchema); + this.table = table; + this.format = dataFileFormat; + this.positionDeleteRowSchema = positionDeleteRowSchema; } static Builder builderFor(Table table) { return new Builder(table); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configureDataWrite(Avro.DataWriteBuilder builder) { builder.createWriterFunc(DataWriter::create); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { builder.createWriterFunc(DataWriter::create); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { builder.createWriterFunc(DataWriter::create); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configureDataWrite(Parquet.DataWriteBuilder builder) { builder.createWriterFunc(GenericParquetWriter::create); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc(GenericParquetWriter::create); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc(GenericParquetWriter::create); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configureDataWrite(ORC.DataWriteBuilder builder) { builder.createWriterFunc(GenericOrcWriter::buildWriter); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(GenericOrcWriter::buildWriter); } - @Override + /** + * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as + * the configuration is done by the {@link FormatModelRegistry}. + */ + @Deprecated protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(GenericOrcWriter::buildWriter); } + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + if (positionDeleteRowSchema == null) { + return super.newPositionDeleteWriter(file, spec, partition); + } else { + LOG.warn( + "Deprecated feature used. Position delete row schema is used to create the position delete writer."); + Map properties = table == null ? ImmutableMap.of() : table.properties(); + MetricsConfig metricsConfig = + table == null + ? MetricsConfig.forPositionDelete() + : MetricsConfig.forPositionDelete(table); + + try { + return switch (format) { + case AVRO -> + Avro.writeDeletes(file) + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case ORC -> + ORC.writeDeletes(file) + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case PARQUET -> + Parquet.writeDeletes(file) + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .createWriterFunc(GenericParquetWriter::create) + .withPartition(partition) + .overwrite() + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + default -> + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); + }; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + } + public static class Builder { private final Table table; private FileFormat dataFileFormat; diff --git a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java new file mode 100644 index 000000000000..868b41f5840b --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * A base writer factory to be extended by query engine integrations. + * + * @param row type + */ +public abstract class RegistryBasedFileWriterFactory + implements FileWriterFactory, Serializable { + private final Table table; + private final FileFormat dataFileFormat; + private final Class inputType; + private final Schema dataSchema; + private final SortOrder dataSortOrder; + private final FileFormat deleteFileFormat; + private final int[] equalityFieldIds; + private final Schema equalityDeleteRowSchema; + private final SortOrder equalityDeleteSortOrder; + private final Map writerProperties; + private final S inputSchema; + private final S equalityDeleteInputSchema; + + protected RegistryBasedFileWriterFactory( + Table table, + FileFormat dataFileFormat, + Class inputType, + Schema dataSchema, + SortOrder dataSortOrder, + FileFormat deleteFileFormat, + int[] equalityFieldIds, + Schema equalityDeleteRowSchema, + SortOrder equalityDeleteSortOrder, + Map writerProperties, + S inputSchema, + S equalityDeleteInputSchema) { + this.table = table; + this.dataFileFormat = dataFileFormat; + this.inputType = inputType; + this.dataSchema = dataSchema; + this.dataSortOrder = dataSortOrder; + this.deleteFileFormat = deleteFileFormat; + this.equalityFieldIds = equalityFieldIds; + this.equalityDeleteRowSchema = equalityDeleteRowSchema; + this.equalityDeleteSortOrder = equalityDeleteSortOrder; + this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of(); + this.inputSchema = inputSchema; + this.equalityDeleteInputSchema = equalityDeleteInputSchema; + } + + protected S inputSchema() { + return inputSchema; + } + + protected S equalityDeleteInputSchema() { + return equalityDeleteInputSchema; + } + + @Override + public DataWriter newDataWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + Preconditions.checkArgument(dataSchema != null, "Invalid data schema: null"); + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault(); + + try { + FileWriterBuilder, S> builder = + FormatModelRegistry.dataWriteBuilder(dataFileFormat, inputType, file); + return builder + .schema(dataSchema) + .engineSchema(inputSchema()) + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .sortOrder(dataSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new data writer", e); + } + } + + @Override + public EqualityDeleteWriter newEqualityDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + Preconditions.checkArgument(equalityDeleteRowSchema != null, "Invalid delete schema: null"); + + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault(); + + try { + FileWriterBuilder, S> builder = + FormatModelRegistry.equalityDeleteWriteBuilder(deleteFileFormat, inputType, file); + return builder + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .schema(equalityDeleteRowSchema) + .engineSchema(equalityDeleteInputSchema()) + .equalityFieldIds(equalityFieldIds) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .sortOrder(equalityDeleteSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forPositionDelete(table) : MetricsConfig.forPositionDelete(); + + try { + FileWriterBuilder, ?> builder = + FormatModelRegistry.positionDeleteWriteBuilder(deleteFileFormat, file); + return builder + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } +}