diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index e86dd9f97aa1..7e944510a85e 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -57,7 +57,8 @@ private FormatModelRegistry() {} private static final List CLASSES_TO_REGISTER = ImmutableList.of( "org.apache.iceberg.data.GenericFormatModels", - "org.apache.iceberg.arrow.vectorized.ArrowFormatModels"); + "org.apache.iceberg.arrow.vectorized.ArrowFormatModels", + "org.apache.iceberg.flink.data.FlinkFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS = diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 55f3b5701e0b..444c0d0226bd 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -40,7 +40,13 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -/** A base writer factory to be extended by query engine integrations. */ +/** + * A base writer factory to be extended by query engine integrations. + * + * @deprecated since version 1.11.0 and will be removed in 1.12.0. Use {@link + * RegistryBasedFileWriterFactory} + */ +@Deprecated public abstract class BaseFileWriterFactory implements FileWriterFactory, Serializable { private final Table table; private final FileFormat dataFileFormat; @@ -75,13 +81,6 @@ protected BaseFileWriterFactory( this.positionDeleteRowSchema = null; } - /** - * @deprecated This constructor is deprecated as of version 1.11.0 and will be removed in 1.12.0. - * Position deletes that include row data are no longer supported. Use {@link - * #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder, FileFormat, int[], Schema, - * SortOrder, Map)} instead. - */ - @Deprecated protected BaseFileWriterFactory( Table table, FileFormat dataFileFormat, diff --git a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java new file mode 100644 index 000000000000..868b41f5840b --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * A base writer factory to be extended by query engine integrations. + * + * @param row type + */ +public abstract class RegistryBasedFileWriterFactory + implements FileWriterFactory, Serializable { + private final Table table; + private final FileFormat dataFileFormat; + private final Class inputType; + private final Schema dataSchema; + private final SortOrder dataSortOrder; + private final FileFormat deleteFileFormat; + private final int[] equalityFieldIds; + private final Schema equalityDeleteRowSchema; + private final SortOrder equalityDeleteSortOrder; + private final Map writerProperties; + private final S inputSchema; + private final S equalityDeleteInputSchema; + + protected RegistryBasedFileWriterFactory( + Table table, + FileFormat dataFileFormat, + Class inputType, + Schema dataSchema, + SortOrder dataSortOrder, + FileFormat deleteFileFormat, + int[] equalityFieldIds, + Schema equalityDeleteRowSchema, + SortOrder equalityDeleteSortOrder, + Map writerProperties, + S inputSchema, + S equalityDeleteInputSchema) { + this.table = table; + this.dataFileFormat = dataFileFormat; + this.inputType = inputType; + this.dataSchema = dataSchema; + this.dataSortOrder = dataSortOrder; + this.deleteFileFormat = deleteFileFormat; + this.equalityFieldIds = equalityFieldIds; + this.equalityDeleteRowSchema = equalityDeleteRowSchema; + this.equalityDeleteSortOrder = equalityDeleteSortOrder; + this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of(); + this.inputSchema = inputSchema; + this.equalityDeleteInputSchema = equalityDeleteInputSchema; + } + + protected S inputSchema() { + return inputSchema; + } + + protected S equalityDeleteInputSchema() { + return equalityDeleteInputSchema; + } + + @Override + public DataWriter newDataWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + Preconditions.checkArgument(dataSchema != null, "Invalid data schema: null"); + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault(); + + try { + FileWriterBuilder, S> builder = + FormatModelRegistry.dataWriteBuilder(dataFileFormat, inputType, file); + return builder + .schema(dataSchema) + .engineSchema(inputSchema()) + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .sortOrder(dataSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new data writer", e); + } + } + + @Override + public EqualityDeleteWriter newEqualityDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + Preconditions.checkArgument(equalityDeleteRowSchema != null, "Invalid delete schema: null"); + + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault(); + + try { + FileWriterBuilder, S> builder = + FormatModelRegistry.equalityDeleteWriteBuilder(deleteFileFormat, inputType, file); + return builder + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .schema(equalityDeleteRowSchema) + .engineSchema(equalityDeleteInputSchema()) + .equalityFieldIds(equalityFieldIds) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .sortOrder(equalityDeleteSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forPositionDelete(table) : MetricsConfig.forPositionDelete(); + + try { + FileWriterBuilder, ?> builder = + FormatModelRegistry.positionDeleteWriteBuilder(deleteFileFormat, file); + return builder + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java new file mode 100644 index 000000000000..0026c8a3021d --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; +import org.apache.iceberg.parquet.ParquetFormatModel; + +public class FlinkFormatModels { + public static void register() { + FormatModelRegistry.register( + ParquetFormatModel.create( + RowData.class, + RowType.class, + (icebergSchema, fileSchema, engineSchema) -> + FlinkParquetWriters.buildWriter(engineSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + AvroFormatModel.create( + RowData.class, + RowType.class, + (icebergSchema, fileSchema, engineSchema) -> new FlinkAvroWriter(engineSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + FlinkPlannedAvroReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + RowData.class, + RowType.class, + (icebergSchema, fileSchema, engineSchema) -> + FlinkOrcWriter.buildWriter(engineSchema, icebergSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + new FlinkOrcReader(icebergSchema, fileSchema, idToConstant))); + } + + private FlinkFormatModels() {} +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java index b3ada41737bc..d5247941d863 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java @@ -25,28 +25,19 @@ import java.io.Serializable; import java.util.Map; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.data.FlinkAvroWriter; -import org.apache.iceberg.flink.data.FlinkOrcWriter; -import org.apache.iceberg.flink.data.FlinkParquetWriters; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -public class FlinkFileWriterFactory extends BaseFileWriterFactory implements Serializable { - private RowType dataFlinkType; - private RowType equalityDeleteFlinkType; - - private FlinkFileWriterFactory( +public class FlinkFileWriterFactory extends RegistryBasedFileWriterFactory + implements Serializable { + FlinkFileWriterFactory( Table table, FileFormat dataFileFormat, Schema dataSchema, @@ -62,85 +53,30 @@ private FlinkFileWriterFactory( super( table, dataFileFormat, + RowData.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - writeProperties); - - this.dataFlinkType = dataFlinkType; - this.equalityDeleteFlinkType = equalityDeleteFlinkType; - } - - static Builder builderFor(Table table) { - return new Builder(table); - } - - @Override - protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType())); - } - - @Override - protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new FlinkAvroWriter(equalityDeleteFlinkType())); - } - - @Override - protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {} - - @Override - protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(dataFlinkType(), msgType)); - } - - @Override - protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), msgType)); - } - - @Override - protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { - builder.transformPaths(path -> StringData.fromString(path.toString())); - } - - @Override - protected void configureDataWrite(ORC.DataWriteBuilder builder) { - builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema)); - } - - @Override - protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema)); + writeProperties, + dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) : dataFlinkType, + equalityDeleteInputSchema(equalityDeleteFlinkType, equalityDeleteRowSchema)); } - @Override - protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { - builder.transformPaths(path -> StringData.fromString(path.toString())); - } - - private RowType dataFlinkType() { - if (dataFlinkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema()); + private static RowType equalityDeleteInputSchema(RowType rowType, Schema rowSchema) { + if (rowType != null) { + return rowType; + } else if (rowSchema != null) { + return FlinkSchemaUtil.convert(rowSchema); + } else { + return null; } - - return dataFlinkType; } - private RowType equalityDeleteFlinkType() { - if (equalityDeleteFlinkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteFlinkType = FlinkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteFlinkType; + static Builder builderFor(Table table) { + return new Builder(table); } public static class Builder { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index b8fb1ba32edf..ee4aaf4a3da1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -24,10 +24,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.expressions.Expression; @@ -35,19 +33,14 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkSourceFilter; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkOrcReader; -import org.apache.iceberg.flink.data.FlinkParquetReaders; -import org.apache.iceberg.flink.data.FlinkPlannedAvroReader; import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.flink.data.RowDataUtil; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; @Internal @@ -73,8 +66,7 @@ public RowDataFileScanTaskReader( if (filters != null && !filters.isEmpty()) { Expression combinedExpression = filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and); - this.rowFilter = - new FlinkSourceFilter(this.projectedSchema, combinedExpression, this.caseSensitive); + this.rowFilter = new FlinkSourceFilter(projectedSchema, combinedExpression, caseSensitive); } else { this.rowFilter = null; } @@ -112,23 +104,23 @@ private CloseableIterable newIterable( if (task.isDataTask()) { throw new UnsupportedOperationException("Cannot read data task."); } else { - switch (task.file().format()) { - case PARQUET: - iter = newParquetIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - case AVRO: - iter = newAvroIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - case ORC: - iter = newOrcIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - default: - throw new UnsupportedOperationException( - "Cannot read unknown format: " + task.file().format()); + ReadBuilder builder = + FormatModelRegistry.readBuilder( + task.file().format(), RowData.class, inputFilesDecryptor.getInputFile(task)); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + + iter = + builder + .project(schema) + .idToConstant(idToConstant) + .split(task.start(), task.length()) + .caseSensitive(caseSensitive) + .filter(task.residual()) + .reuseContainers() + .build(); } if (rowFilter != null) { @@ -137,72 +129,6 @@ private CloseableIterable newIterable( return iter; } - private CloseableIterable newAvroIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Avro.ReadBuilder builder = - Avro.read(inputFilesDecryptor.getInputFile(task)) - .reuseContainers() - .project(schema) - .split(task.start(), task.length()) - .createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant)); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - - private CloseableIterable newParquetIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Parquet.ReadBuilder builder = - Parquet.read(inputFilesDecryptor.getInputFile(task)) - .split(task.start(), task.length()) - .project(schema) - .createReaderFunc( - fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema, idToConstant)) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .reuseContainers(); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - - private CloseableIterable newOrcIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - schema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - ORC.ReadBuilder builder = - ORC.read(inputFilesDecryptor.getInputFile(task)) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(task.start(), task.length()) - .createReaderFunc( - readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant)) - .filter(task.residual()) - .caseSensitive(caseSensitive); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - private static class FlinkDeleteFilter extends DeleteFilter { private final RowType requiredRowType; private final RowDataWrapper asStructLike; diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index da5b5f6c28f0..339cd0510efb 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -35,7 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynFields; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.SimpleDataUtil; @@ -238,21 +238,21 @@ private static Map appenderProperties( testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.prepareSnapshotPreBarrier(1L); - DynFields.BoundField operatorField = + DynFields.BoundField> operatorField = DynFields.builder() .hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator") .build(testHarness.getOperatorFactory()); - DynFields.BoundField writerField = + DynFields.BoundField> writerField = DynFields.builder() .hiddenImpl(IcebergStreamWriter.class, "writer") .build(operatorField.get()); - DynFields.BoundField writerFactoryField = + DynFields.BoundField> writerFactoryField = DynFields.builder() .hiddenImpl(BaseTaskWriter.class, "writerFactory") .build(writerField.get()); DynFields.BoundField> propsField = DynFields.builder() - .hiddenImpl(BaseFileWriterFactory.class, "writerProperties") + .hiddenImpl(RegistryBasedFileWriterFactory.class, "writerProperties") .build(writerFactoryField.get()); return propsField.get(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java index 8e346cd8a145..f604f639f217 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java @@ -34,7 +34,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynFields; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; @@ -365,13 +365,13 @@ private Map properties(DynamicWriter dynamicWriter) { DynFields.BoundField>> writerField = DynFields.builder().hiddenImpl(dynamicWriter.getClass(), "writers").build(dynamicWriter); - DynFields.BoundField writerFactoryField = + DynFields.BoundField> writerFactoryField = DynFields.builder() .hiddenImpl(BaseTaskWriter.class, "writerFactory") .build(writerField.get().values().iterator().next()); DynFields.BoundField> propsField = DynFields.builder() - .hiddenImpl(BaseFileWriterFactory.class, "writerProperties") + .hiddenImpl(RegistryBasedFileWriterFactory.class, "writerProperties") .build(writerFactoryField.get()); return propsField.get(); }