Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ final class CombinedWatermarkStatus {

private boolean idle = false;

/**
* Whether at least one output was ever registered. Used to distinguish the initial empty state
* (before any output is registered) from an empty state caused by unregistering all outputs.
*/
private boolean hadOutputs = false;

public long getCombinedWatermark() {
return combinedWatermark;
}
Expand All @@ -51,6 +57,7 @@ public boolean remove(PartialWatermark o) {
}

public void add(PartialWatermark element) {
hadOutputs = true;
partialWatermarks.add(element);
}

Expand All @@ -63,8 +70,16 @@ public void add(PartialWatermark element) {
* @return true, if the combined watermark changed
*/
public boolean updateCombinedWatermark() {
// if we don't have any outputs, we should not emit
// if we don't have any outputs, we should not emit.
if (partialWatermarks.isEmpty()) {
// If all previously registered outputs have been unregistered, an empty set is
// semantically equivalent to all outputs being idle. We must mark the combined status
// as idle to avoid carrying over a stale idle=false state set by a watermark that was
// emitted while the outputs were still active. The initial empty state (before any
// output is registered) is left untouched so we don't prematurely propagate idleness.
if (hadOutputs) {
this.idle = true;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,21 +427,27 @@ void testRemoveNotRegisteredReturnValue() {
}

@Test
void testNotEmittingIdleAfterAllSplitsRemoved() {
void testEmittingIdleAfterAllSplitsRemoved() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);

// An active output emits a watermark, which marks the combined status as non-idle.
Watermark emittedWatermark = new Watermark(1);
final String id = UUID.randomUUID().toString();
multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
immediateOutput.emitWatermark(emittedWatermark);
multiplexer.unregisterOutput(id);
assertThat(underlyingWatermarkOutput.isIdle()).isFalse();

// Once all outputs are unregistered there is nothing left to track, so the combined status
// must fall back to idle instead of staying stuck at the previous non-idle state. The last
// watermark must not regress when there are no outputs left.
multiplexer.unregisterOutput(id);
multiplexer.onPeriodicEmit();

assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(emittedWatermark);
assertThat(underlyingWatermarkOutput.isIdle()).isFalse();
assertThat(underlyingWatermarkOutput.isIdle()).isTrue();
}

/**
Expand Down