diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java index fb290ef957b1d..3470b5bdf2552 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java @@ -38,6 +38,12 @@ final class CombinedWatermarkStatus { private boolean idle = false; + /** + * Whether at least one output was ever registered. Used to distinguish the initial empty state + * (before any output is registered) from an empty state caused by unregistering all outputs. + */ + private boolean hadOutputs = false; + public long getCombinedWatermark() { return combinedWatermark; } @@ -51,6 +57,7 @@ public boolean remove(PartialWatermark o) { } public void add(PartialWatermark element) { + hadOutputs = true; partialWatermarks.add(element); } @@ -63,8 +70,16 @@ public void add(PartialWatermark element) { * @return true, if the combined watermark changed */ public boolean updateCombinedWatermark() { - // if we don't have any outputs, we should not emit + // if we don't have any outputs, we should not emit. if (partialWatermarks.isEmpty()) { + // If all previously registered outputs have been unregistered, an empty set is + // semantically equivalent to all outputs being idle. We must mark the combined status + // as idle to avoid carrying over a stale idle=false state set by a watermark that was + // emitted while the outputs were still active. The initial empty state (before any + // output is registered) is left untouched so we don't prematurely propagate idleness. + if (hadOutputs) { + this.idle = true; + } return false; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java index 4ff83f9c24a5e..3ec9306b90411 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java @@ -427,21 +427,27 @@ void testRemoveNotRegisteredReturnValue() { } @Test - void testNotEmittingIdleAfterAllSplitsRemoved() { + void testEmittingIdleAfterAllSplitsRemoved() { final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + // An active output emits a watermark, which marks the combined status as non-idle. Watermark emittedWatermark = new Watermark(1); final String id = UUID.randomUUID().toString(); multiplexer.registerNewOutput(id); WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id); immediateOutput.emitWatermark(emittedWatermark); - multiplexer.unregisterOutput(id); + assertThat(underlyingWatermarkOutput.isIdle()).isFalse(); + // Once all outputs are unregistered there is nothing left to track, so the combined status + // must fall back to idle instead of staying stuck at the previous non-idle state. The last + // watermark must not regress when there are no outputs left. + multiplexer.unregisterOutput(id); multiplexer.onPeriodicEmit(); + assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(emittedWatermark); - assertThat(underlyingWatermarkOutput.isIdle()).isFalse(); + assertThat(underlyingWatermarkOutput.isIdle()).isTrue(); } /**