diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 56c136f7bfa339..e71aa65a746967 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -63,6 +63,7 @@ import java.io.File; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -186,10 +187,30 @@ void writerAndCommitterExecuteInStreamingModeWithScaling( runStreamingWithScalingTest( config, initialParallelism, trackingCommitter, false, miniCluster, clusterClient); - assertThat(committed.get()) - .extracting(Committer.CommitRequest::getCommittable) - .containsExactlyInAnyOrderElementsOf( - duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE)); + assertRecordsContainAtLeastExpected( + committed.get().stream() + .map(Committer.CommitRequest::getCommittable) + .collect(Collectors.toList()), + duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE)); + } + + @Test + void committedRecordAssertionAllowsAdditionalRetriedCommittables() { + final List> expected = duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE); + final List> actual = new ArrayList<>(expected); + actual.add(expected.get(0)); + + assertRecordsContainAtLeastExpected(actual, expected); + } + + private static void assertRecordsContainAtLeastExpected( + Collection> actual, List> expected) { + final List> remainingActual = new ArrayList<>(actual); + for (Record expectedRecord : expected) { + assertThat(remainingActual).contains(expectedRecord); + remainingActual.remove(expectedRecord); + } + assertThat(remainingActual).isSubsetOf(expected); } private static List> duplicate(List> values) {