Skip to content

[FLINK-39586][core] Mark the combined watermark status idle when all partial watermarks are unregistered#28486

Open
wilmerdooley wants to merge 1 commit into
apache:masterfrom
wilmerdooley:oss/flink-39586
Open

[FLINK-39586][core] Mark the combined watermark status idle when all partial watermarks are unregistered#28486
wilmerdooley wants to merge 1 commit into
apache:masterfrom
wilmerdooley:oss/flink-39586

Conversation

@wilmerdooley

@wilmerdooley wilmerdooley commented Jun 19, 2026

Copy link
Copy Markdown

What is the purpose of the change

CombinedWatermarkStatus.updateCombinedWatermark() returns early when partialWatermarks is empty without updating the idle flag. If outputs were registered and advanced the watermark and then all of them were unregistered (for example a HybridSource finishing its bounded phase before the unbounded phase begins), the idle flag stays stuck at false (it was set to false by the watermarks emitted while the outputs were active). That prevents downstream watermark-idleness propagation, so subtasks that should go idle never do.

This change makes the empty-outputs branch reflect that an empty set of outputs is equivalent to all outputs being idle, while preserving the initial-phase behavior.

Brief change log

  • CombinedWatermarkStatus.updateCombinedWatermark(): when partialWatermarks is empty, set idle = true only if the combined watermark has already advanced past Long.MIN_VALUE (some output was active before). During the initial phase, when no output has ever advanced the watermark, the status stays active, keeping the logic in sync with StatusWatermarkValve.

Verifying this change

This change added unit tests:

  • CombinedWatermarkStatusTest: after an output emits a watermark (idle becomes false) and all outputs are then unregistered, updateCombinedWatermark() marks the status idle; and the initial empty phase does not become idle.
  • WatermarkOutputMultiplexerTest: the downstream output is marked idle once all partial outputs are released after having been active.

These tests fail on the pre-fix code (idle stuck at false) and pass after the fix.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • Public API, i.e., is any changed class annotated with @Public(Evolving): no (CombinedWatermarkStatus is @Internal, package-private)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (a single comparison on the empty-outputs branch of the watermark-combine path, which runs on watermark/idleness updates, not per record)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (it corrects watermark-idleness propagation in a post-recovery / source-switch scenario, for example a HybridSource restored from a checkpoint then finishing its bounded phase; the change is confined to flink-core watermark-status bookkeeping and does not touch snapshot format, JobManager, Kubernetes, Yarn, or ZooKeeper)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

This PR was written with the assistance of generative AI tooling.

  • Was generative AI tooling used to co-author this PR?

Generated-by: Claude Code

JIRA: https://issues.apache.org/jira/browse/FLINK-39586

@wilmerdooley wilmerdooley changed the title FLINK-39586: Mark the combined watermark status idle when all partial watermarks are unregistered [FLINK-39586][core] Mark the combined watermark status idle when all partial watermarks are unregistered Jun 19, 2026
@flinkbot

flinkbot commented Jun 19, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@spuru9 spuru9 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You would have to run mvn spotless:apply for the BUILD to go through
  2. This change was a existing change which was removed by FLINK-23767 (#16817). Can you look into that first to figure out why it was removed earlier, before reverting it back - would be helpful.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jun 19, 2026
…to mark idle when

Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
@wilmerdooley

Copy link
Copy Markdown
Author

I have run spotless:apply and pushed it, so the format check should be sorted.

On FLINK-23767 (#16817, FLIP-180): it removed this line on purpose. FLIP-180 makes idleness user-controlled and drops the code that auto-marks a source idle, to avoid late records, so re-adding it reintroduces exactly the automatic idleness it set out to remove. I agree it should not go back as a blind revert.

That said, the empty/all-unregistered case still looks like a real gap (FLINK-38454 deliberately left this branch alone). FLIP-180 and FLINK-38454 both handle "inputs gone" by flushing the maximum watermark rather than signalling idle, so emitting the combined max watermark when the outputs drain to empty would fit the current model better than toggling the idle flag.

Would you prefer that direction, or to handle it closer to StatusWatermarkValve / the idleness path? Happy to follow your call.

@spuru9

spuru9 commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Looping in @AHeise @fapaul for comments on the approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants