From ba81eb11e2cd37226c2a4b3ac60e2eb44a8a43f6 Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Wed, 25 Mar 2026 15:46:42 +0000 Subject: [PATCH] Flink Sink V2: Add OutputFileFactoryProvider plugin interface --- .../iceberg/flink/sink/IcebergSink.java | 21 +++++++-- .../flink/sink/OutputFileFactoryProvider.java | 45 +++++++++++++++++++ .../flink/sink/RowDataTaskWriterFactory.java | 44 +++++++++++++++--- 3 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/OutputFileFactoryProvider.java diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index eaaf4ea6e4e3..48ae0c7e3e00 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; @@ -171,6 +172,8 @@ public class IcebergSink // equalityFieldIds instead. private final Set equalityFieldColumns; + @Nullable private final OutputFileFactoryProvider outputFileFactoryProvider; + private final transient List> maintenanceTasks; private final transient FlinkMaintenanceConfig flinkMaintenanceConfig; @@ -188,7 +191,8 @@ private IcebergSink( boolean overwriteMode, List> maintenanceTasks, FlinkMaintenanceConfig flinkMaintenanceConfig, - Set equalityFieldColumns) { + Set equalityFieldColumns, + @Nullable OutputFileFactoryProvider outputFileFactoryProvider) { this.tableLoader = tableLoader; this.snapshotProperties = snapshotProperties; this.uidSuffix = uidSuffix; @@ -212,6 +216,7 @@ private IcebergSink( this.maintenanceTasks = maintenanceTasks; this.flinkMaintenanceConfig = flinkMaintenanceConfig; this.equalityFieldColumns = equalityFieldColumns; + this.outputFileFactoryProvider = outputFileFactoryProvider; } @Override @@ -224,7 +229,10 @@ public SinkWriter createWriter(WriterInitContext context) { dataFileFormat, writeProperties, equalityFieldIds, - upsertMode); + upsertMode, + table.schema(), + table.spec(), + outputFileFactoryProvider); IcebergStreamWriterMetrics metrics = new IcebergStreamWriterMetrics(context.metricGroup(), table.name()); return new IcebergSinkWriter( @@ -349,6 +357,7 @@ public static class Builder implements IcebergSinkBuilder { private ReadableConfig readableConfig = new Configuration(); private List equalityFieldColumns = null; private final List> maintenanceTasks = Lists.newArrayList(); + @Nullable private OutputFileFactoryProvider outputFileFactoryProvider; private Builder() {} @@ -716,6 +725,11 @@ public Builder toBranch(String branch) { return this; } + public Builder outputFileFactoryProvider(OutputFileFactoryProvider provider) { + this.outputFileFactoryProvider = provider; + return this; + } + IcebergSink build() { Preconditions.checkArgument( @@ -806,7 +820,8 @@ IcebergSink build() { overwriteMode, maintenanceTasks, flinkMaintenanceConfig, - equalityFieldColumnsSet); + equalityFieldColumnsSet, + outputFileFactoryProvider); } /** diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/OutputFileFactoryProvider.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/OutputFileFactoryProvider.java new file mode 100644 index 000000000000..92e0c36d618e --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/OutputFileFactoryProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.OutputFileFactory; + +/** + * Plugin interface for providing a custom {@link OutputFileFactory}. + * + *

Implementations can customize how output files are created, enabling use cases such as: + * + *

    + *
  • Custom file naming or path rewriting + *
  • Routing writes to non-standard storage endpoints + *
  • Region-specific or environment-specific file placement + *
+ * + *

When set on {@link IcebergSink.Builder#outputFileFactoryProvider}, the provided factory + * replaces the default {@link OutputFileFactory} created by {@link RowDataTaskWriterFactory}. + */ +@FunctionalInterface +public interface OutputFileFactoryProvider extends Serializable { + OutputFileFactory create( + Table table, int taskId, int attemptId, FileFormat format, PartitionSpec spec); +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index bc3bc51cedc4..92d172545017 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -53,6 +54,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final Set equalityFieldIds; private final boolean upsert; private final FileWriterFactory fileWriterFactory; + @Nullable private final OutputFileFactoryProvider outputFileFactoryProvider; private boolean useDv; private transient OutputFileFactory outputFileFactory; @@ -105,7 +107,32 @@ public RowDataTaskWriterFactory( boolean upsert, Schema schema, PartitionSpec spec) { + this( + tableSupplier, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsert, + schema, + spec, + null); + } + + public RowDataTaskWriterFactory( + SerializableSupplier tableSupplier, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + Map writeProperties, + Collection equalityFieldIds, + boolean upsert, + Schema schema, + PartitionSpec spec, + @Nullable OutputFileFactoryProvider outputFileFactoryProvider) { this.tableSupplier = tableSupplier; + this.outputFileFactoryProvider = outputFileFactoryProvider; Table table; if (tableSupplier instanceof CachingTableSupplier) { @@ -174,12 +201,17 @@ public void initialize(int taskId, int attemptId) { refreshTable(); this.useDv = TableUtil.formatVersion(table) > 2; - this.outputFileFactory = - OutputFileFactory.builderFor(table, taskId, attemptId) - .format(format) - .ioSupplier(() -> tableSupplier.get().io()) - .defaultSpec(spec) - .build(); + if (outputFileFactoryProvider != null) { + this.outputFileFactory = + outputFileFactoryProvider.create(table, taskId, attemptId, format, spec); + } else { + this.outputFileFactory = + OutputFileFactory.builderFor(table, taskId, attemptId) + .format(format) + .ioSupplier(() -> tableSupplier.get().io()) + .defaultSpec(spec) + .build(); + } } @Override