From 16295b8c6d0e8e02cb5e566a5ad1beeb8432a7c3 Mon Sep 17 00:00:00 2001 From: Yanjun Qiu <153984347+qiuyanjun888@users.noreply.github.com> Date: Tue, 23 Jun 2026 00:09:34 +0800 Subject: [PATCH 1/2] [FLINK-39954][tests] Relax SinkV2 scaling commit assertion --- .../test/streaming/runtime/SinkV2ITCase.java | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 56c136f7bfa339..4782c51d90aa63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -63,6 +63,7 @@ import java.io.File; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -186,10 +187,30 @@ void writerAndCommitterExecuteInStreamingModeWithScaling( runStreamingWithScalingTest( config, initialParallelism, trackingCommitter, false, miniCluster, clusterClient); - assertThat(committed.get()) - .extracting(Committer.CommitRequest::getCommittable) - .containsExactlyInAnyOrderElementsOf( - duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE)); + assertRecordsContainAtLeastExpected( + committed.get().stream() + .map(Committer.CommitRequest::getCommittable) + .collect(Collectors.toList()), + duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE)); + } + + @Test + void committedRecordAssertionAllowsAdditionalRetriedCommittables() { + final List> expected = duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE); + final List> actual = new ArrayList<>(expected); + actual.add(expected.get(0)); + + assertRecordsContainAtLeastExpected(actual, expected); + } + + private static void assertRecordsContainAtLeastExpected( + Collection> actual, List> expected) { + final List> remainingActual = new ArrayList<>(actual); + for (Record expectedRecord : expected) { + assertThat(remainingActual).contains(expectedRecord); + remainingActual.remove(expectedRecord); + } + assertThat(remainingActual).allSatisfy(record -> assertThat(expected).contains(record)); } private static List> duplicate(List> values) { From d4dc536e51d3e32e5817ba769d6956029b6547a3 Mon Sep 17 00:00:00 2001 From: Yanjun Qiu <153984347+qiuyanjun888@users.noreply.github.com> Date: Tue, 23 Jun 2026 02:14:01 +0800 Subject: [PATCH 2/2] [FLINK-39954][tests] Apply review suggestion for SinkV2 assertion --- .../org/apache/flink/test/streaming/runtime/SinkV2ITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 4782c51d90aa63..e71aa65a746967 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -210,7 +210,7 @@ private static void assertRecordsContainAtLeastExpected( assertThat(remainingActual).contains(expectedRecord); remainingActual.remove(expectedRecord); } - assertThat(remainingActual).allSatisfy(record -> assertThat(expected).contains(record)); + assertThat(remainingActual).isSubsetOf(expected); } private static List> duplicate(List> values) {