[FLINK-39586][core] Mark the combined watermark status idle when all partial watermarks are unregistered#28486
[FLINK-39586][core] Mark the combined watermark status idle when all partial watermarks are unregistered#28486wilmerdooley wants to merge 1 commit into
Conversation
77e76ff to
da243b6
Compare
spuru9
left a comment
There was a problem hiding this comment.
- You would have to run
mvn spotless:applyfor the BUILD to go through - This change was a existing change which was removed by FLINK-23767 (#16817). Can you look into that first to figure out why it was removed earlier, before reverting it back - would be helpful.
da243b6 to
0f9f65b
Compare
…to mark idle when Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
0f9f65b to
15d4bd3
Compare
|
I have run spotless:apply and pushed it, so the format check should be sorted. On FLINK-23767 (#16817, FLIP-180): it removed this line on purpose. FLIP-180 makes idleness user-controlled and drops the code that auto-marks a source idle, to avoid late records, so re-adding it reintroduces exactly the automatic idleness it set out to remove. I agree it should not go back as a blind revert. That said, the empty/all-unregistered case still looks like a real gap (FLINK-38454 deliberately left this branch alone). FLIP-180 and FLINK-38454 both handle "inputs gone" by flushing the maximum watermark rather than signalling idle, so emitting the combined max watermark when the outputs drain to empty would fit the current model better than toggling the idle flag. Would you prefer that direction, or to handle it closer to StatusWatermarkValve / the idleness path? Happy to follow your call. |
What is the purpose of the change
CombinedWatermarkStatus.updateCombinedWatermark()returns early whenpartialWatermarksis empty without updating the idle flag. If outputs were registered and advanced the watermark and then all of them were unregistered (for example a HybridSource finishing its bounded phase before the unbounded phase begins), the idle flag stays stuck at false (it was set to false by the watermarks emitted while the outputs were active). That prevents downstream watermark-idleness propagation, so subtasks that should go idle never do.This change makes the empty-outputs branch reflect that an empty set of outputs is equivalent to all outputs being idle, while preserving the initial-phase behavior.
Brief change log
CombinedWatermarkStatus.updateCombinedWatermark(): whenpartialWatermarksis empty, setidle = trueonly if the combined watermark has already advanced pastLong.MIN_VALUE(some output was active before). During the initial phase, when no output has ever advanced the watermark, the status stays active, keeping the logic in sync withStatusWatermarkValve.Verifying this change
This change added unit tests:
CombinedWatermarkStatusTest: after an output emits a watermark (idle becomes false) and all outputs are then unregistered,updateCombinedWatermark()marks the status idle; and the initial empty phase does not become idle.WatermarkOutputMultiplexerTest: the downstream output is marked idle once all partial outputs are released after having been active.These tests fail on the pre-fix code (idle stuck at false) and pass after the fix.
Does this pull request potentially affect one of the following parts
@Public(Evolving): no (CombinedWatermarkStatusis@Internal, package-private)Documentation
This PR was written with the assistance of generative AI tooling.
Generated-by: Claude Code
JIRA: https://issues.apache.org/jira/browse/FLINK-39586