Skip to content

[FLINK-14621][flink-table-planner] Do not generate watermark assigner operator if no time attribute operations on rowtime#28091

Open
Dennis-Mircea wants to merge 4 commits intoapache:masterfrom
Dennis-Mircea:FLINK-14621
Open

[FLINK-14621][flink-table-planner] Do not generate watermark assigner operator if no time attribute operations on rowtime#28091
Dennis-Mircea wants to merge 4 commits intoapache:masterfrom
Dennis-Mircea:FLINK-14621

Conversation

@Dennis-Mircea
Copy link
Copy Markdown

@Dennis-Mircea Dennis-Mircea commented May 2, 2026

What is the purpose of the change

This pull request resolves FLINK-14621 by stopping the planner from emitting a WatermarkAssigner operator when no downstream operator depends on watermarks at runtime.

Until now, every WATERMARK FOR … declaration in DDL produced a runtime WatermarkAssigner regardless of whether anything downstream actually consumed the watermark (window aggregates, event-time interval / temporal joins, event-time temporal sort, CURRENT_WATERMARK()). The assigner is functionally harmless but wasteful: it is one more per-record operator on the hot path and clutters every plan and EXPLAIN. After this change the assigner is dropped from the physical plan whenever it is provably unused, while remaining present on every plan that does need it.

The remove pass is implemented as a HEP rule that runs in the PHYSICAL_REWRITE phase. The rule fires once per optimization, anchored on the HEP planner's root rel, covering both sink-rooted plans (INSERT INTO …, every branch of a statement set) and sink-less roots (EXPLAIN SELECT …, verifyExecPlan, Table API toDataStream / toChangelogStream). It walks the subtree from the root towards the sources, drops every redundant StreamPhysicalWatermarkAssigner, and rewrites the operators on the path between the former assigner and the root so that the time-indicator (*ROWTIME*) marker on the watermark column is demoted back to a plain TIMESTAMP where appropriate.

Brief change log

  • New rule RedundantWatermarkAssignerRemoveRule (flink-table-planner, package plan.rules.physical.stream).
    • Anchoring. The operand matches any StreamPhysicalRel; matches(RelOptRuleCall) checks that the matched rel is the current HepPlanner root via getRoot() identity. This guarantees a single firing per optimization and avoids both over-removal (firing on a sub-tree without ancestry context) and HEP thrashing.
    • Watermark consumers. A node is treated as a watermark consumer if any of these holds:
      • it returns requireWatermark() == true (window aggregates, event-time interval / temporal joins, event-time match-recognize, PTFs, …);
      • it hosts a CURRENT_WATERMARK(…) SQL call in its expressions, scanned defensively via RelNode#accept(RexShuttle);
      • it is a StreamPhysicalTemporalSort whose primary sort key is a rowtime time-indicator (its runtime operator RowTimeSortOperator fires on watermarks even though the rel does not formally implement requireWatermark).
    • Per-branch root-rowtime guard. If the root's row type carries an event-time *ROWTIME* indicator on a field, the corresponding upstream rowtime column is "protected" and the assigner that produces it is kept. The protected set is propagated downward precisely:
      • pass-through rels (Filter, Sort, Union, Exchange, Sink, ChangelogNormalize, …): preserved, bounded by child input width to handle rels like the window TVF that append columns;
      • Calc / Project: an output index propagates iff the projection at that index is a bare RexInputRef;
      • Join: indexes split between the left/right halves; semi/anti joins only project the left side;
      • Aggregate / window aggregate: clears the protected set.
    • Type-marker demotion. When an assigner is dropped, parent StreamPhysicalCalc programs are rebuilt against the new input row type; only RexInputRefs whose column type changed are retyped (a minimal RexShuttle). The full RexTimeIndicatorMaterializer is deliberately not re-run here, because it would re-wrap already-materialized PROCTIME_MATERIALIZE(...) calls on unrelated proctime columns. Pass-through rels (Exchange, Union, ChangelogNormalize, Sink, …) are recreated via RelNode#copy(traits, [newInputs]).
    • Adjacent-Calc folding. Dropping a WatermarkAssigner that sat between two Calcs naturally leaves them adjacent. The rule folds them via FlinkRelUtil.merge(top, bottom) so the rewrite produces the same compact plan that a subsequent FlinkCalcMergeRule pass would.
  • Wired into the optimizer. New HEP program remove redundant watermarks after PHYSICAL_REWRITE in FlinkStreamProgram, plus a REMOVE_REDUNDANT_WATERMARK_RULES rule set in FlinkStreamRuleSets.
  • MiniBatchIntervalInferRule ported to Java and simplified: removed the unreachable MiniBatchMode.ProcTime branch from the StreamPhysicalWatermarkAssigner case (any surviving assigner is rowtime-driven by construction); reformatted the class Javadoc; dropped the last Scala source under this package.
  • Tests.
    • New RedundantWatermarkAssignerRemoveRuleTest (13 cases): simple SELECT, Calc chain, Tumble window kept, rowtime interval join kept, CURRENT_WATERMARK in projection / filter kept, mixed Union, statement set with mixed branches, event-time temporal join kept, proctime lookup join removed, watermark-pushdown source without event-time consumer removed, watermark-pushdown source with window kept, sink rowtime forwarding kept.
    • Goldens regenerated where the redundant assigner was previously printed: MiniBatchIntervalInferTest, WindowTableFunctionTest, AggregateTest, TableAggregateTest, GroupWindowTableAggregateTest, TableScanTest, TableSourceTest, MultiJoinTest, WindowAggregateTest, TemporalJoinTest, WindowJoinTest, UnionTest, DuplicateChangesInferRuleTest, StateTtlHintTest, JavaCatalogTableTest, StreamOperatorNameTest, DeltaJoinTest, NonDeterministicDagTest, and the seven Python*GroupWindowAggregate / Python*OverAggregate JSON-plan tests. Diffs are uniformly "redundant WatermarkAssigner removed; the affected *ROWTIME* column demoted to TIMESTAMP; adjacent Calcs folded".

Verifying this change

This change added tests and can be verified as follows:

This change adds tests and can be verified as follows:

  • RedundantWatermarkAssignerRemoveRuleTest covers the rule's positive cases (assigner removed under simple Calc, Calc chain, mixed Union branch, statement set, proctime lookup join, watermark-pushdown source without consumer) and negative cases (assigner kept for window aggregations, rowtime interval join, event-time temporal join, watermark-pushdown source feeding a window, sinks that consume a rowtime column, CURRENT_WATERMARK in a projection or filter, sink rowtime forwarding).
  • All updated goldens were regenerated and reviewed; they all show one of: assigner dropped, rowtime column demoted to plain TIMESTAMP, two Calcs merged into one.
  • Full flink-table-planner test suite runs locally with no failures introduced by this change. Targeted runs over *MiniBatch*, *Watermark*, *Window*, *TemporalJoin* and CalcITCase#testCurrentWatermark* pass.
  • WindowAggregateITCase.testDistinctAggWithMergeOnEventTimeSessionWindow (which exercises a non-struct row type at an intermediate HEP root) passes; this verifies the rule's defensive handling of non-struct HEP-root row types.
  • ./mvnw spotless:apply and ./mvnw checkstyle:check -T1C are clean for the touched modules.

Known limitations (documented in the rule's Javadoc)

  • Computed-column rowtime forwarding to sinks. If a sink projects a column derived from a rowtime via an expression (e.g. sink_rt AS rt + INTERVAL '1' SECOND) instead of forwarding the rowtime column directly, the result type is plain TIMESTAMP and not a TimeIndicatorRelDataType. The protected-index propagation cannot trace the rowtime through the composite expression and the assigner may be dropped. The semantics of "watermark forwarding through a derived expression" are not well-defined in Flink today; users that need watermark propagation should project the rowtime column directly.
  • Per-branch protection across unions. When the sink's protected rowtime is fed by a Union, every branch's corresponding rowtime input is protected. This is required for correctness: at runtime the downstream operator advances watermarks as the minimum across union inputs, so dropping the assigner on one branch would block (not skip) progress on that input.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes - the change removes a per-record operator (WatermarkAssigner) from plans that don't need it; existing plans that still need watermarks are unaffected.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no - it is a planner optimization with no user-visible API or configuration change. No documentation updates required.
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 2, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

… operator if no time attribute operations on rowtime
Comment thread flink-table/flink-sql-client/src/test/resources/sql/table.q Outdated
@Dennis-Mircea Dennis-Mircea requested a review from davidradl May 6, 2026 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants