From 1c532541b440d6a03f32a439dce1e00ea2718031 Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Fri, 13 Feb 2026 23:09:59 +0000 Subject: [PATCH] Flink: Add extensibility support to IcebergSink for downstream composition (#15315) Add CommittableMetadata framework (marker interface, serializer, registry) allowing downstream to attach custom metadata to committables. Widen access modifiers on sink pipeline classes to enable downstream connector implementations to compose with Iceberg's existing sink infrastructure. All changes are additive with no behavioral changes to existing code paths. --- .../flink/sink/CachingTableSupplier.java | 4 +- .../flink/sink/CommittableMetadata.java | 36 +++++++++ .../sink/CommittableMetadataRegistry.java | 58 ++++++++++++++ .../sink/CommittableMetadataSerializer.java | 54 +++++++++++++ .../flink/sink/IcebergCommittable.java | 35 +++++++-- .../sink/IcebergCommittableSerializer.java | 21 ++++- .../iceberg/flink/sink/IcebergCommitter.java | 4 +- .../iceberg/flink/sink/IcebergSink.java | 77 ++++++++++++++++++- .../flink/sink/IcebergSinkBuilder.java | 2 +- .../iceberg/flink/sink/IcebergSinkWriter.java | 4 +- .../flink/sink/IcebergWriteAggregator.java | 5 +- .../flink/sink/RowDataTaskWriterFactory.java | 20 +++++ .../apache/iceberg/flink/sink/SinkUtil.java | 3 +- 13 files changed, 300 insertions(+), 23 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java index 0afc07cc1977..d14c6ec9f10f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java @@ -33,7 +33,7 @@ * table loader should be used carefully when used with writer tasks. It could result in heavy load * on a catalog for jobs with many writers. */ -class CachingTableSupplier implements SerializableSupplier { +public class CachingTableSupplier implements SerializableSupplier
{ private static final Logger LOG = LoggerFactory.getLogger(CachingTableSupplier.class); @@ -43,7 +43,7 @@ class CachingTableSupplier implements SerializableSupplier
{ private long lastLoadTimeMillis; private transient Table table; - CachingTableSupplier( + public CachingTableSupplier( SerializableTable initialTable, TableLoader tableLoader, Duration tableRefreshInterval) { Preconditions.checkArgument(initialTable != null, "initialTable cannot be null"); Preconditions.checkArgument(tableLoader != null, "tableLoader cannot be null"); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java new file mode 100644 index 000000000000..26d20b661d4a --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; + +/** + * Marker interface for custom metadata that flows through the Iceberg sink pipeline. + * + *

This interface allows users to attach arbitrary metadata to committables as they flow from + * writers through aggregators to committers. Implementations can carry custom information such as + * watermarks or other application-specific data. + * + *

Metadata serialization is handled by {@link CommittableMetadataSerializer} implementations + * registered via {@link CommittableMetadataRegistry}. + * + * @see CommittableMetadataSerializer + * @see CommittableMetadataRegistry + */ +public interface CommittableMetadata extends Serializable {} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java new file mode 100644 index 000000000000..93ea4e5b9129 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import javax.annotation.Nullable; + +/** + * Global registry for {@link CommittableMetadataSerializer} implementations. + * + *

This registry provides a location to register custom serializers for {@link + * CommittableMetadata}. The registered serializer is used by {@link IcebergCommittableSerializer} + * and {@link WriteResultSerializer} to serialize/deserialize metadata flowing through the pipeline. + * + * @see CommittableMetadata + * @see CommittableMetadataSerializer + */ +public class CommittableMetadataRegistry { + private static volatile CommittableMetadataSerializer serializer = null; + + private CommittableMetadataRegistry() {} + + /** + * Register a metadata serializer. + * + *

This should be called before any Iceberg sinks are created. + * + * @param metadataSerializer The serializer to register (can be null to clear registration) + */ + public static void register(@Nullable CommittableMetadataSerializer metadataSerializer) { + serializer = metadataSerializer; + } + + /** + * Get the registered metadata serializer. + * + * @return The registered serializer, or null if none is registered + */ + @Nullable + public static CommittableMetadataSerializer get() { + return serializer; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java new file mode 100644 index 000000000000..ed5c02661d58 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * Serializer for {@link CommittableMetadata} implementations. + * + *

Responsible for serializing and deserializing custom metadata that flows through the Iceberg + * sink pipeline. The serializer must be registered via {@link CommittableMetadataRegistry} to be + * used by the sink's serialization infrastructure. + * + * @see CommittableMetadata + * @see CommittableMetadataRegistry + */ +public interface CommittableMetadataSerializer extends Serializable { + /** + * Serialize the given metadata to the output stream. + * + * @param metadata The metadata to serialize (never null) + * @param out The output stream to write to + * @throws IOException If serialization fails + */ + void write(CommittableMetadata metadata, DataOutputView out) throws IOException; + + /** + * Deserialize metadata from the input stream. + * + * @param in The input stream to read from + * @return The deserialized metadata (never null) + * @throws IOException If deserialization fails + */ + CommittableMetadata read(DataInputView in) throws IOException; +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java index 408c3e9a9d5f..52ff70591393 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Objects; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** @@ -31,41 +32,58 @@ *

{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer * and the Aggregator operator and between the Aggregator and the Committer as well. */ -class IcebergCommittable implements Serializable { +public class IcebergCommittable implements Serializable { private final byte[] manifest; private final String jobId; private final String operatorId; private final long checkpointId; + @Nullable private final CommittableMetadata metadata; - IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { + public IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { + this(manifest, jobId, operatorId, checkpointId, null); + } + + public IcebergCommittable( + byte[] manifest, + String jobId, + String operatorId, + long checkpointId, + @Nullable CommittableMetadata metadata) { this.manifest = manifest; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; + this.metadata = metadata; } - byte[] manifest() { + public byte[] manifest() { return manifest; } - String jobId() { + public String jobId() { return jobId; } - String operatorId() { + public String operatorId() { return operatorId; } - Long checkpointId() { + public Long checkpointId() { return checkpointId; } + @Nullable + public CommittableMetadata metadata() { + return metadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("jobId", jobId) .add("checkpointId", checkpointId) .add("operatorId", operatorId) + .add("metadata", metadata) .toString(); } @@ -83,12 +101,13 @@ public boolean equals(Object o) { return checkpointId == that.checkpointId && Arrays.equals(manifest, that.manifest) && Objects.equals(jobId, that.jobId) - && Objects.equals(operatorId, that.operatorId); + && Objects.equals(operatorId, that.operatorId) + && Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - int result = Objects.hash(jobId, operatorId, checkpointId); + int result = Objects.hash(jobId, operatorId, checkpointId, metadata); result = 31 * result + Arrays.hashCode(manifest); return result; } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java index 1d83c211e001..690b2822a8f3 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java @@ -47,6 +47,16 @@ public byte[] serialize(IcebergCommittable committable) throws IOException { view.writeLong(committable.checkpointId()); view.writeInt(committable.manifest().length); view.write(committable.manifest()); + + boolean hasMetadata = committable.metadata() != null; + view.writeBoolean(hasMetadata); + if (hasMetadata) { + CommittableMetadataSerializer metadataSerializer = CommittableMetadataRegistry.get(); + if (metadataSerializer != null) { + metadataSerializer.write(committable.metadata(), view); + } + } + return out.toByteArray(); } @@ -61,7 +71,16 @@ public IcebergCommittable deserialize(int version, byte[] serialized) throws IOE byte[] manifestBuf; manifestBuf = new byte[manifestLen]; view.read(manifestBuf); - return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); + + CommittableMetadata metadata = null; + if (view.readBoolean()) { // hasMetadata + CommittableMetadataSerializer metadataSerializer = CommittableMetadataRegistry.get(); + if (metadataSerializer != null) { + metadata = metadataSerializer.read(view); + } + } + + return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId, metadata); } throw new IOException("Unrecognized version or corrupt state: " + version); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index c05e7d918093..45f591f4d759 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -57,7 +57,7 @@ * same jobId-operatorId-checkpointId triplet * */ -class IcebergCommitter implements Committer { +public class IcebergCommitter implements Committer { private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class); private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; public static final WriteResult EMPTY_WRITE_RESULT = @@ -80,7 +80,7 @@ class IcebergCommitter implements Committer { private int continuousEmptyCheckpoints = 0; private boolean compactMode = false; - IcebergCommitter( + public IcebergCommitter( TableLoader tableLoader, String branch, Map snapshotProperties, diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 65b6f6252727..3db8b99083e2 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -167,7 +167,7 @@ public class IcebergSink // equalityFieldIds instead. private final Set equalityFieldColumns; - private IcebergSink( + protected IcebergSink( TableLoader tableLoader, Table table, Map snapshotProperties, @@ -205,6 +205,75 @@ private IcebergSink( this.equalityFieldColumns = equalityFieldColumns; } + // Protected getters for subclass access + protected TableLoader getTableLoader() { + return tableLoader; + } + + protected Table getTable() { + return table; + } + + protected Map getSnapshotProperties() { + return snapshotProperties; + } + + protected String getUidSuffix() { + return uidSuffix; + } + + protected String getSinkId() { + return sinkId; + } + + protected Map getWriteProperties() { + return writeProperties; + } + + protected RowType getFlinkRowType() { + return flinkRowType; + } + + protected SerializableSupplier

getTableSupplier() { + return tableSupplier; + } + + protected FlinkWriteConf getFlinkWriteConf() { + return flinkWriteConf; + } + + protected Set getEqualityFieldIds() { + return equalityFieldIds; + } + + protected boolean isUpsertMode() { + return upsertMode; + } + + protected FileFormat getDataFileFormat() { + return dataFileFormat; + } + + protected long getTargetDataFileSize() { + return targetDataFileSize; + } + + protected String getBranch() { + return branch; + } + + protected boolean isOverwriteMode() { + return overwriteMode; + } + + protected int getWorkerPoolSize() { + return workerPoolSize; + } + + protected boolean isCompactMode() { + return compactMode; + } + @Override public SinkWriter createWriter(WriterInitContext context) { RowDataTaskWriterFactory taskWriterFactory = @@ -339,7 +408,7 @@ public static class Builder implements IcebergSinkBuilder { private ReadableConfig readableConfig = new Configuration(); private List equalityFieldColumns = null; - private Builder() {} + protected Builder() {} private Builder forRowData(DataStream newRowDataInput) { this.inputCreator = ignored -> newRowDataInput; @@ -732,7 +801,7 @@ private static String defaultSuffix(String uidSuffix, String defaultSuffix) { return uidSuffix; } - private static SerializableTable checkAndGetTable(TableLoader tableLoader, Table table) { + public static SerializableTable checkAndGetTable(TableLoader tableLoader, Table table) { if (table == null) { if (!tableLoader.isOpen()) { tableLoader.open(); @@ -851,7 +920,7 @@ private DataStream distributeDataStreamByHashDistributionMode( } } - private int resolveWriterParallelism(DataStream input) { + protected int resolveWriterParallelism(DataStream input) { // if the writeParallelism is not specified, we set the default to the input parallelism to // encourage chaining. return Optional.ofNullable(flinkWriteConf.writeParallelism()).orElseGet(input::getParallelism); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java index 577b2b9a4227..255a347c4d4c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java @@ -39,7 +39,7 @@ * be replaced by direct {@link IcebergSink} usage. */ @Internal -interface IcebergSinkBuilder> { +public interface IcebergSinkBuilder> { /** * @deprecated Use {@link #resolvedSchema(ResolvedSchema)} instead. diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java index 7234cf74020e..5bb1581d0e42 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java @@ -37,7 +37,7 @@ * emits a single {@link WriteResult} at every checkpoint for every data/delete file created by this * writer. */ -class IcebergSinkWriter implements CommittingSinkWriter { +public class IcebergSinkWriter implements CommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkWriter.class); private final String fullTableName; @@ -47,7 +47,7 @@ class IcebergSinkWriter implements CommittingSinkWriter { private final int subTaskId; private final int attemptId; - IcebergSinkWriter( + public IcebergSinkWriter( String fullTableName, TaskWriterFactory taskWriterFactory, IcebergStreamWriterMetrics metrics, diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index 4a4a789bf9ef..dd82303a193a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -43,7 +43,8 @@ * IcebergCommittable} per checkpoint (storing the serialized {@link * org.apache.iceberg.flink.sink.DeltaManifests}, jobId, operatorId, checkpointId) */ -class IcebergWriteAggregator extends AbstractStreamOperator> +public class IcebergWriteAggregator + extends AbstractStreamOperator> implements OneInputStreamOperator< CommittableMessage, CommittableMessage> { @@ -58,7 +59,7 @@ class IcebergWriteAggregator extends AbstractStreamOperator getTableSupplier() { + return tableSupplier; + } + + protected PartitionSpec getSpec() { + return spec; + } + + protected FileFormat getFormat() { + return format; + } + + protected OutputFileFactory getOutputFileFactory() { + return outputFileFactory; + } + + protected void setOutputFileFactory(OutputFileFactory outputFileFactory) { + this.outputFileFactory = outputFileFactory; + } + private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index b3a9ac6ba2eb..a94a20d707d8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -54,7 +54,8 @@ private SinkUtil() {} private static final Logger LOG = LoggerFactory.getLogger(SinkUtil.class); - static Set checkAndGetEqualityFieldIds(Table table, List equalityFieldColumns) { + public static Set checkAndGetEqualityFieldIds( + Table table, List equalityFieldColumns) { Set equalityFieldIds = Sets.newHashSet(table.schema().identifierFieldIds()); if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) { Set equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());