From 068d8584b098f1a3482f19952808165d08b931ae Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Wed, 25 Mar 2026 17:25:29 +0000 Subject: [PATCH] Flink Sink V2: Add PostCommitHook plugin interface --- .../iceberg/flink/sink/IcebergCommitter.java | 34 +++++++++++++++ .../iceberg/flink/sink/IcebergSink.java | 19 +++++++-- .../iceberg/flink/sink/PostCommitHook.java | 42 +++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PostCommitHook.java diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index 8e45a2db30b2..623a9f427de0 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -26,12 +26,14 @@ import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; @@ -79,6 +81,7 @@ class IcebergCommitter implements Committer { private ExecutorService workerPool; private int continuousEmptyCheckpoints = 0; private final boolean tableMaintenanceEnabled; + @Nullable private final PostCommitHook postCommitHook; IcebergCommitter( TableLoader tableLoader, @@ -89,6 +92,28 @@ class IcebergCommitter implements Committer { String sinkId, IcebergFilesCommitterMetrics committerMetrics, boolean tableMaintenanceEnabled) { + this( + tableLoader, + branch, + snapshotProperties, + replacePartitions, + workerPoolSize, + sinkId, + committerMetrics, + tableMaintenanceEnabled, + null); + } + + IcebergCommitter( + TableLoader tableLoader, + String branch, + Map snapshotProperties, + boolean replacePartitions, + int workerPoolSize, + String sinkId, + IcebergFilesCommitterMetrics committerMetrics, + boolean tableMaintenanceEnabled, + @Nullable PostCommitHook postCommitHook) { this.branch = branch; this.snapshotProperties = snapshotProperties; this.replacePartitions = replacePartitions; @@ -108,6 +133,7 @@ class IcebergCommitter implements Committer { "iceberg-committer-pool-" + table.name() + "-" + sinkId, workerPoolSize); this.continuousEmptyCheckpoints = 0; this.tableMaintenanceEnabled = tableMaintenanceEnabled; + this.postCommitHook = postCommitHook; } @Override @@ -307,6 +333,14 @@ private void commitOperation( if (committerMetrics != null) { committerMetrics.commitDuration(durationMs); } + + if (postCommitHook != null) { + table.refresh(); + Snapshot snapshot = table.currentSnapshot(); + if (snapshot != null) { + postCommitHook.afterCommit(snapshot.snapshotId(), snapshot.summary()); + } + } } @Override diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index eaaf4ea6e4e3..379c4a3d1027 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; @@ -171,6 +172,8 @@ public class IcebergSink // equalityFieldIds instead. private final Set equalityFieldColumns; + @Nullable private final PostCommitHook postCommitHook; + private final transient List> maintenanceTasks; private final transient FlinkMaintenanceConfig flinkMaintenanceConfig; @@ -188,7 +191,8 @@ private IcebergSink( boolean overwriteMode, List> maintenanceTasks, FlinkMaintenanceConfig flinkMaintenanceConfig, - Set equalityFieldColumns) { + Set equalityFieldColumns, + @Nullable PostCommitHook postCommitHook) { this.tableLoader = tableLoader; this.snapshotProperties = snapshotProperties; this.uidSuffix = uidSuffix; @@ -212,6 +216,7 @@ private IcebergSink( this.maintenanceTasks = maintenanceTasks; this.flinkMaintenanceConfig = flinkMaintenanceConfig; this.equalityFieldColumns = equalityFieldColumns; + this.postCommitHook = postCommitHook; } @Override @@ -247,7 +252,8 @@ public Committer createCommitter(CommitterInitContext contex workerPoolSize, sinkId, metrics, - maintenanceEnabled); + maintenanceEnabled, + postCommitHook); } @Override @@ -349,6 +355,7 @@ public static class Builder implements IcebergSinkBuilder { private ReadableConfig readableConfig = new Configuration(); private List equalityFieldColumns = null; private final List> maintenanceTasks = Lists.newArrayList(); + @Nullable private PostCommitHook postCommitHook; private Builder() {} @@ -716,6 +723,11 @@ public Builder toBranch(String branch) { return this; } + public Builder postCommitHook(PostCommitHook hook) { + this.postCommitHook = hook; + return this; + } + IcebergSink build() { Preconditions.checkArgument( @@ -806,7 +818,8 @@ IcebergSink build() { overwriteMode, maintenanceTasks, flinkMaintenanceConfig, - equalityFieldColumnsSet); + equalityFieldColumnsSet, + postCommitHook); } /** diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PostCommitHook.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PostCommitHook.java new file mode 100644 index 000000000000..b485a267b27c --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PostCommitHook.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; +import java.util.Map; + +/** + * Callback invoked after each successful Iceberg snapshot commit. + * + *

Implementations can react to committed snapshots for use cases such as: + * + *

    + *
  • Audit logging of committed snapshots + *
  • Synchronizing metadata to external catalogs (e.g., Hive Metastore, Nessie) + *
  • Triggering compaction or maintenance workflows + *
  • Tracking custom metadata across commits + *
+ * + *

The hook is called after the Iceberg commit succeeds but before the Flink checkpoint + * completes. Exceptions thrown by the hook will propagate to the committer. + */ +@FunctionalInterface +public interface PostCommitHook extends Serializable { + void afterCommit(long snapshotId, Map snapshotSummary); +}