diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java new file mode 100644 index 0000000000000..f42975afe3524 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A {@link Sink} with a stateful {@link SinkWriter}. + * + *

This interface is retained as a deprecated bridge for connectors compiled against Flink 1.x + * that reference this type. It was originally removed in Flink 2.0 (FLINK-36245) but restored to + * ease migration, as the connector ecosystem had not fully migrated to the mixin pattern introduced + * in FLIP-372. + * + *

New connector implementations should directly implement {@link Sink} and {@link + * SupportsWriterState} instead. + * + * @param The type of the sink's input + * @param The type of the sink writer's state + * @deprecated Implement {@link Sink} and {@link SupportsWriterState} instead. This interface exists + * solely as a binary compatibility bridge for connectors compiled against Flink 1.x and will be + * removed in a future release. + */ +@PublicEvolving +@Deprecated +public interface StatefulSink + extends Sink, SupportsWriterState { + + /** + * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible + * state to this sink. + * + * @deprecated Use {@link SupportsWriterState.WithCompatibleState} instead. + */ + @PublicEvolving + @Deprecated + interface WithCompatibleState extends SupportsWriterState.WithCompatibleState {} + + /** + * A {@link SinkWriter} whose state needs to be checkpointed. + * + * @param The type of the sink writer's input + * @param The type of the writer's state + * @deprecated Use {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} directly. + */ + @PublicEvolving + @Deprecated + interface StatefulSinkWriter + extends org.apache.flink.api.connector.sink2.StatefulSinkWriter {} +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java new file mode 100644 index 0000000000000..bf3a4974d8a11 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; + +/** + * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink} + * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that + * actually commits the data. To facilitate the separation the {@link SinkWriter} creates + * committables on checkpoint or end of input and then sends it to the {@link Committer}. + * + *

This interface is retained as a deprecated bridge for connectors compiled against Flink 1.x + * that reference this type. It was originally removed in Flink 2.0 (FLINK-36245) but restored to + * ease migration, as the connector ecosystem had not fully migrated to the mixin pattern introduced + * in FLIP-372. + * + *

New connector implementations should directly implement {@link Sink} and {@link + * SupportsCommitter} instead. + * + * @param The type of the sink's input + * @param The type of the committables. + * @deprecated Implement {@link Sink} and {@link SupportsCommitter} instead. This interface exists + * solely as a binary compatibility bridge for connectors compiled against Flink 1.x and will be + * removed in a future release. + */ +@PublicEvolving +@Deprecated +public interface TwoPhaseCommittingSink + extends Sink, SupportsCommitter { + + /** + * Creates a {@link Committer} that permanently makes the previously written data visible + * through {@link Committer#commit(java.util.Collection)}. + * + * @return A committer for the two-phase commit protocol. + * @throws IOException for any failure during creation. + * @deprecated Please use {@link #createCommitter(CommitterInitContext)}. + */ + @Deprecated + default Committer createCommitter() throws IOException { + throw new UnsupportedOperationException( + "Override either createCommitter() or createCommitter(CommitterInitContext)"); + } + + /** + * Creates a {@link Committer}. Delegates to the no-arg {@link #createCommitter()} by default to + * support connectors compiled against Flink 1.x that only overrode the no-arg variant. + */ + @Override + default Committer createCommitter(CommitterInitContext context) throws IOException { + return createCommitter(); + } + + /** + * A {@link SinkWriter} that performs the first part of a two-phase commit protocol. + * + * @deprecated Use {@link CommittingSinkWriter} directly. + */ + @PublicEvolving + @Deprecated + interface PrecommittingSinkWriter extends CommittingSinkWriter {} +} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/sink2/StatefulSinkBridgeTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/StatefulSinkBridgeTest.java new file mode 100644 index 0000000000000..4adaf82373971 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/StatefulSinkBridgeTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class StatefulSinkBridgeTest { + + @Test + void statefulSinkIsRecognizedAsSink() { + TestStatefulSink sink = new TestStatefulSink(); + assertThat(sink).isInstanceOf(Sink.class); + } + + @Test + void statefulSinkIsRecognizedAsSupportsWriterState() { + TestStatefulSink sink = new TestStatefulSink(); + assertThat(sink).isInstanceOf(SupportsWriterState.class); + } + + @Test + void statefulSinkWriterExtendsCoreSinkWriter() { + assertThat(StatefulSinkWriter.class) + .isAssignableFrom(StatefulSink.StatefulSinkWriter.class); + } + + @Test + void withCompatibleStateExtendsCoreWithCompatibleState() { + assertThat(SupportsWriterState.WithCompatibleState.class) + .isAssignableFrom(StatefulSink.WithCompatibleState.class); + } + + private static class TestStatefulSink implements StatefulSink { + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return new TestStatefulSinkWriter(); + } + + @Override + public StatefulSinkWriter restoreWriter( + WriterInitContext context, Collection recoveredState) throws IOException { + return new TestStatefulSinkWriter(); + } + + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { + return new SimpleVersionedSerializer<>() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Integer obj) { + return new byte[0]; + } + + @Override + public Integer deserialize(int version, byte[] serialized) { + return 0; + } + }; + } + } + + private static class TestStatefulSinkWriter + implements StatefulSink.StatefulSinkWriter { + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void write(String element, Context context) {} + + @Override + public void flush(boolean endOfInput) {} + + @Override + public void close() {} + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSinkBridgeTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSinkBridgeTest.java new file mode 100644 index 0000000000000..41f342faf8003 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSinkBridgeTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +class TwoPhaseCommittingSinkBridgeTest { + + @Test + void twoPhaseCommittingSinkIsRecognizedAsSink() { + TestTwoPhaseCommittingSink sink = new TestTwoPhaseCommittingSink(); + assertThat(sink).isInstanceOf(Sink.class); + } + + @Test + void twoPhaseCommittingSinkIsRecognizedAsSupportsCommitter() { + TestTwoPhaseCommittingSink sink = new TestTwoPhaseCommittingSink(); + assertThat(sink).isInstanceOf(SupportsCommitter.class); + } + + @Test + void precommittingSinkWriterExtendsCommittingSinkWriter() { + assertThat(CommittingSinkWriter.class) + .isAssignableFrom(TwoPhaseCommittingSink.PrecommittingSinkWriter.class); + } + + @Test + void noArgCreateCommitterDelegation() throws IOException { + LegacyTwoPhaseCommittingSink sink = new LegacyTwoPhaseCommittingSink(); + Committer committer = sink.createCommitter(null); + assertThat(committer).isNotNull(); + } + + private static class TestTwoPhaseCommittingSink + implements TwoPhaseCommittingSink { + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return new TestPrecommittingWriter(); + } + + @Override + public Committer createCommitter(CommitterInitContext context) throws IOException { + return new TestCommitter(); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new SimpleVersionedSerializer<>() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(String obj) { + return new byte[0]; + } + + @Override + public String deserialize(int version, byte[] serialized) { + return ""; + } + }; + } + } + + /** Simulates a 1.x connector that only overrides the no-arg createCommitter(). */ + private static class LegacyTwoPhaseCommittingSink + implements TwoPhaseCommittingSink { + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return new TestPrecommittingWriter(); + } + + @Override + public Committer createCommitter() throws IOException { + return new TestCommitter(); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new SimpleVersionedSerializer<>() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(String obj) { + return new byte[0]; + } + + @Override + public String deserialize(int version, byte[] serialized) { + return ""; + } + }; + } + } + + private static class TestPrecommittingWriter + implements TwoPhaseCommittingSink.PrecommittingSinkWriter { + + @Override + public Collection prepareCommit() { + return Collections.emptyList(); + } + + @Override + public void write(String element, Context context) {} + + @Override + public void flush(boolean endOfInput) {} + + @Override + public void close() {} + } + + private static class TestCommitter implements Committer { + + @Override + public void commit(Collection> committables) {} + + @Override + public void close() {} + } +}