diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java index 0afc07cc1977..d14c6ec9f10f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java @@ -33,7 +33,7 @@ * table loader should be used carefully when used with writer tasks. It could result in heavy load * on a catalog for jobs with many writers. */ -class CachingTableSupplier implements SerializableSupplier { +public class CachingTableSupplier implements SerializableSupplier
{ private static final Logger LOG = LoggerFactory.getLogger(CachingTableSupplier.class); @@ -43,7 +43,7 @@ class CachingTableSupplier implements SerializableSupplier
{ private long lastLoadTimeMillis; private transient Table table; - CachingTableSupplier( + public CachingTableSupplier( SerializableTable initialTable, TableLoader tableLoader, Duration tableRefreshInterval) { Preconditions.checkArgument(initialTable != null, "initialTable cannot be null"); Preconditions.checkArgument(tableLoader != null, "tableLoader cannot be null"); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java new file mode 100644 index 000000000000..26d20b661d4a --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadata.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; + +/** + * Marker interface for custom metadata that flows through the Iceberg sink pipeline. + * + *

This interface allows users to attach arbitrary metadata to committables as they flow from + * writers through aggregators to committers. Implementations can carry custom information such as + * watermarks or other application-specific data. + * + *

Metadata serialization is handled by {@link CommittableMetadataSerializer} implementations + * registered via {@link CommittableMetadataRegistry}. + * + * @see CommittableMetadataSerializer + * @see CommittableMetadataRegistry + */ +public interface CommittableMetadata extends Serializable {} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java new file mode 100644 index 000000000000..93ea4e5b9129 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataRegistry.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import javax.annotation.Nullable; + +/** + * Global registry for {@link CommittableMetadataSerializer} implementations. + * + *

This registry provides a location to register custom serializers for {@link + * CommittableMetadata}. The registered serializer is used by {@link IcebergCommittableSerializer} + * and {@link WriteResultSerializer} to serialize/deserialize metadata flowing through the pipeline. + * + * @see CommittableMetadata + * @see CommittableMetadataSerializer + */ +public class CommittableMetadataRegistry { + private static volatile CommittableMetadataSerializer serializer = null; + + private CommittableMetadataRegistry() {} + + /** + * Register a metadata serializer. + * + *

This should be called before any Iceberg sinks are created. + * + * @param metadataSerializer The serializer to register (can be null to clear registration) + */ + public static void register(@Nullable CommittableMetadataSerializer metadataSerializer) { + serializer = metadataSerializer; + } + + /** + * Get the registered metadata serializer. + * + * @return The registered serializer, or null if none is registered + */ + @Nullable + public static CommittableMetadataSerializer get() { + return serializer; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java new file mode 100644 index 000000000000..ed5c02661d58 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableMetadataSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * Serializer for {@link CommittableMetadata} implementations. + * + *

Responsible for serializing and deserializing custom metadata that flows through the Iceberg + * sink pipeline. The serializer must be registered via {@link CommittableMetadataRegistry} to be + * used by the sink's serialization infrastructure. + * + * @see CommittableMetadata + * @see CommittableMetadataRegistry + */ +public interface CommittableMetadataSerializer extends Serializable { + /** + * Serialize the given metadata to the output stream. + * + * @param metadata The metadata to serialize (never null) + * @param out The output stream to write to + * @throws IOException If serialization fails + */ + void write(CommittableMetadata metadata, DataOutputView out) throws IOException; + + /** + * Deserialize metadata from the input stream. + * + * @param in The input stream to read from + * @return The deserialized metadata (never null) + * @throws IOException If deserialization fails + */ + CommittableMetadata read(DataInputView in) throws IOException; +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java index 408c3e9a9d5f..52ff70591393 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Objects; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** @@ -31,41 +32,58 @@ *

{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer * and the Aggregator operator and between the Aggregator and the Committer as well. */ -class IcebergCommittable implements Serializable { +public class IcebergCommittable implements Serializable { private final byte[] manifest; private final String jobId; private final String operatorId; private final long checkpointId; + @Nullable private final CommittableMetadata metadata; - IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { + public IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { + this(manifest, jobId, operatorId, checkpointId, null); + } + + public IcebergCommittable( + byte[] manifest, + String jobId, + String operatorId, + long checkpointId, + @Nullable CommittableMetadata metadata) { this.manifest = manifest; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; + this.metadata = metadata; } - byte[] manifest() { + public byte[] manifest() { return manifest; } - String jobId() { + public String jobId() { return jobId; } - String operatorId() { + public String operatorId() { return operatorId; } - Long checkpointId() { + public Long checkpointId() { return checkpointId; } + @Nullable + public CommittableMetadata metadata() { + return metadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("jobId", jobId) .add("checkpointId", checkpointId) .add("operatorId", operatorId) + .add("metadata", metadata) .toString(); } @@ -83,12 +101,13 @@ public boolean equals(Object o) { return checkpointId == that.checkpointId && Arrays.equals(manifest, that.manifest) && Objects.equals(jobId, that.jobId) - && Objects.equals(operatorId, that.operatorId); + && Objects.equals(operatorId, that.operatorId) + && Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - int result = Objects.hash(jobId, operatorId, checkpointId); + int result = Objects.hash(jobId, operatorId, checkpointId, metadata); result = 31 * result + Arrays.hashCode(manifest); return result; } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java index 1d83c211e001..690b2822a8f3 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java @@ -47,6 +47,16 @@ public byte[] serialize(IcebergCommittable committable) throws IOException { view.writeLong(committable.checkpointId()); view.writeInt(committable.manifest().length); view.write(committable.manifest()); + + boolean hasMetadata = committable.metadata() != null; + view.writeBoolean(hasMetadata); + if (hasMetadata) { + CommittableMetadataSerializer metadataSerializer = CommittableMetadataRegistry.get(); + if (metadataSerializer != null) { + metadataSerializer.write(committable.metadata(), view); + } + } + return out.toByteArray(); } @@ -61,7 +71,16 @@ public IcebergCommittable deserialize(int version, byte[] serialized) throws IOE byte[] manifestBuf; manifestBuf = new byte[manifestLen]; view.read(manifestBuf); - return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); + + CommittableMetadata metadata = null; + if (view.readBoolean()) { // hasMetadata + CommittableMetadataSerializer metadataSerializer = CommittableMetadataRegistry.get(); + if (metadataSerializer != null) { + metadata = metadataSerializer.read(view); + } + } + + return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId, metadata); } throw new IOException("Unrecognized version or corrupt state: " + version); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index c05e7d918093..45f591f4d759 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -57,7 +57,7 @@ * same jobId-operatorId-checkpointId triplet * */ -class IcebergCommitter implements Committer { +public class IcebergCommitter implements Committer { private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class); private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; public static final WriteResult EMPTY_WRITE_RESULT = @@ -80,7 +80,7 @@ class IcebergCommitter implements Committer { private int continuousEmptyCheckpoints = 0; private boolean compactMode = false; - IcebergCommitter( + public IcebergCommitter( TableLoader tableLoader, String branch, Map snapshotProperties, diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 65b6f6252727..3db8b99083e2 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -167,7 +167,7 @@ public class IcebergSink // equalityFieldIds instead. private final Set equalityFieldColumns; - private IcebergSink( + protected IcebergSink( TableLoader tableLoader, Table table, Map snapshotProperties, @@ -205,6 +205,75 @@ private IcebergSink( this.equalityFieldColumns = equalityFieldColumns; } + // Protected getters for subclass access + protected TableLoader getTableLoader() { + return tableLoader; + } + + protected Table getTable() { + return table; + } + + protected Map getSnapshotProperties() { + return snapshotProperties; + } + + protected String getUidSuffix() { + return uidSuffix; + } + + protected String getSinkId() { + return sinkId; + } + + protected Map getWriteProperties() { + return writeProperties; + } + + protected RowType getFlinkRowType() { + return flinkRowType; + } + + protected SerializableSupplier

getTableSupplier() { + return tableSupplier; + } + + protected FlinkWriteConf getFlinkWriteConf() { + return flinkWriteConf; + } + + protected Set getEqualityFieldIds() { + return equalityFieldIds; + } + + protected boolean isUpsertMode() { + return upsertMode; + } + + protected FileFormat getDataFileFormat() { + return dataFileFormat; + } + + protected long getTargetDataFileSize() { + return targetDataFileSize; + } + + protected String getBranch() { + return branch; + } + + protected boolean isOverwriteMode() { + return overwriteMode; + } + + protected int getWorkerPoolSize() { + return workerPoolSize; + } + + protected boolean isCompactMode() { + return compactMode; + } + @Override public SinkWriter createWriter(WriterInitContext context) { RowDataTaskWriterFactory taskWriterFactory = @@ -339,7 +408,7 @@ public static class Builder implements IcebergSinkBuilder { private ReadableConfig readableConfig = new Configuration(); private List equalityFieldColumns = null; - private Builder() {} + protected Builder() {} private Builder forRowData(DataStream newRowDataInput) { this.inputCreator = ignored -> newRowDataInput; @@ -732,7 +801,7 @@ private static String defaultSuffix(String uidSuffix, String defaultSuffix) { return uidSuffix; } - private static SerializableTable checkAndGetTable(TableLoader tableLoader, Table table) { + public static SerializableTable checkAndGetTable(TableLoader tableLoader, Table table) { if (table == null) { if (!tableLoader.isOpen()) { tableLoader.open(); @@ -851,7 +920,7 @@ private DataStream distributeDataStreamByHashDistributionMode( } } - private int resolveWriterParallelism(DataStream input) { + protected int resolveWriterParallelism(DataStream input) { // if the writeParallelism is not specified, we set the default to the input parallelism to // encourage chaining. return Optional.ofNullable(flinkWriteConf.writeParallelism()).orElseGet(input::getParallelism); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java index 577b2b9a4227..255a347c4d4c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java @@ -39,7 +39,7 @@ * be replaced by direct {@link IcebergSink} usage. */ @Internal -interface IcebergSinkBuilder> { +public interface IcebergSinkBuilder> { /** * @deprecated Use {@link #resolvedSchema(ResolvedSchema)} instead. diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java index 7234cf74020e..5bb1581d0e42 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java @@ -37,7 +37,7 @@ * emits a single {@link WriteResult} at every checkpoint for every data/delete file created by this * writer. */ -class IcebergSinkWriter implements CommittingSinkWriter { +public class IcebergSinkWriter implements CommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkWriter.class); private final String fullTableName; @@ -47,7 +47,7 @@ class IcebergSinkWriter implements CommittingSinkWriter { private final int subTaskId; private final int attemptId; - IcebergSinkWriter( + public IcebergSinkWriter( String fullTableName, TaskWriterFactory taskWriterFactory, IcebergStreamWriterMetrics metrics, diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index 4a4a789bf9ef..dd82303a193a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -43,7 +43,8 @@ * IcebergCommittable} per checkpoint (storing the serialized {@link * org.apache.iceberg.flink.sink.DeltaManifests}, jobId, operatorId, checkpointId) */ -class IcebergWriteAggregator extends AbstractStreamOperator> +public class IcebergWriteAggregator + extends AbstractStreamOperator> implements OneInputStreamOperator< CommittableMessage, CommittableMessage> { @@ -58,7 +59,7 @@ class IcebergWriteAggregator extends AbstractStreamOperator getTableSupplier() { + return tableSupplier; + } + + protected PartitionSpec getSpec() { + return spec; + } + + protected FileFormat getFormat() { + return format; + } + + protected OutputFileFactory getOutputFileFactory() { + return outputFileFactory; + } + + protected void setOutputFileFactory(OutputFileFactory outputFileFactory) { + this.outputFileFactory = outputFileFactory; + } + private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index b3a9ac6ba2eb..a94a20d707d8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -54,7 +54,8 @@ private SinkUtil() {} private static final Logger LOG = LoggerFactory.getLogger(SinkUtil.class); - static Set checkAndGetEqualityFieldIds(Table table, List equalityFieldColumns) { + public static Set checkAndGetEqualityFieldIds( + Table table, List equalityFieldColumns) { Set equalityFieldIds = Sets.newHashSet(table.schema().identifierFieldIds()); if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) { Set equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());