[FLINK-14621][flink-table-planner] Do not generate watermark assigner operator if no time attribute operations on rowtime#28091
Open
Dennis-Mircea wants to merge 4 commits intoapache:masterfrom
Open
Conversation
Collaborator
… operator if no time attribute operations on rowtime
davidradl
reviewed
May 5, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This pull request resolves FLINK-14621 by stopping the planner from emitting a
WatermarkAssigneroperator when no downstream operator depends on watermarks at runtime.Until now, every
WATERMARK FOR …declaration in DDL produced a runtimeWatermarkAssignerregardless of whether anything downstream actually consumed the watermark (window aggregates, event-time interval / temporal joins, event-time temporal sort,CURRENT_WATERMARK()). The assigner is functionally harmless but wasteful: it is one more per-record operator on the hot path and clutters every plan andEXPLAIN. After this change the assigner is dropped from the physical plan whenever it is provably unused, while remaining present on every plan that does need it.The remove pass is implemented as a HEP rule that runs in the
PHYSICAL_REWRITEphase. The rule fires once per optimization, anchored on the HEP planner's root rel, covering both sink-rooted plans (INSERT INTO …, every branch of a statement set) and sink-less roots (EXPLAIN SELECT …,verifyExecPlan, Table APItoDataStream/toChangelogStream). It walks the subtree from the root towards the sources, drops every redundantStreamPhysicalWatermarkAssigner, and rewrites the operators on the path between the former assigner and the root so that the time-indicator (*ROWTIME*) marker on the watermark column is demoted back to a plainTIMESTAMPwhere appropriate.Brief change log
RedundantWatermarkAssignerRemoveRule(flink-table-planner, packageplan.rules.physical.stream).StreamPhysicalRel;matches(RelOptRuleCall)checks that the matched rel is the currentHepPlannerroot viagetRoot()identity. This guarantees a single firing per optimization and avoids both over-removal (firing on a sub-tree without ancestry context) and HEP thrashing.requireWatermark() == true(window aggregates, event-time interval / temporal joins, event-time match-recognize, PTFs, …);CURRENT_WATERMARK(…)SQL call in its expressions, scanned defensively viaRelNode#accept(RexShuttle);StreamPhysicalTemporalSortwhose primary sort key is a rowtime time-indicator (its runtime operatorRowTimeSortOperatorfires on watermarks even though the rel does not formally implementrequireWatermark).*ROWTIME*indicator on a field, the corresponding upstream rowtime column is "protected" and the assigner that produces it is kept. The protected set is propagated downward precisely:Calc/Project: an output index propagates iff the projection at that index is a bareRexInputRef;Join: indexes split between the left/right halves; semi/anti joins only project the left side;Aggregate/ window aggregate: clears the protected set.StreamPhysicalCalcprograms are rebuilt against the new input row type; onlyRexInputRefs whose column type changed are retyped (a minimalRexShuttle). The fullRexTimeIndicatorMaterializeris deliberately not re-run here, because it would re-wrap already-materializedPROCTIME_MATERIALIZE(...)calls on unrelated proctime columns. Pass-through rels (Exchange, Union, ChangelogNormalize, Sink, …) are recreated viaRelNode#copy(traits, [newInputs]).WatermarkAssignerthat sat between two Calcs naturally leaves them adjacent. The rule folds them viaFlinkRelUtil.merge(top, bottom)so the rewrite produces the same compact plan that a subsequentFlinkCalcMergeRulepass would.remove redundant watermarksafterPHYSICAL_REWRITEinFlinkStreamProgram, plus aREMOVE_REDUNDANT_WATERMARK_RULESrule set inFlinkStreamRuleSets.MiniBatchIntervalInferRuleported to Java and simplified: removed the unreachableMiniBatchMode.ProcTimebranch from theStreamPhysicalWatermarkAssignercase (any surviving assigner is rowtime-driven by construction); reformatted the class Javadoc; dropped the last Scala source under this package.RedundantWatermarkAssignerRemoveRuleTest(13 cases): simple SELECT, Calc chain, Tumble window kept, rowtime interval join kept,CURRENT_WATERMARKin projection / filter kept, mixed Union, statement set with mixed branches, event-time temporal join kept, proctime lookup join removed, watermark-pushdown source without event-time consumer removed, watermark-pushdown source with window kept, sink rowtime forwarding kept.MiniBatchIntervalInferTest,WindowTableFunctionTest,AggregateTest,TableAggregateTest,GroupWindowTableAggregateTest,TableScanTest,TableSourceTest,MultiJoinTest,WindowAggregateTest,TemporalJoinTest,WindowJoinTest,UnionTest,DuplicateChangesInferRuleTest,StateTtlHintTest,JavaCatalogTableTest,StreamOperatorNameTest,DeltaJoinTest,NonDeterministicDagTest, and the sevenPython*GroupWindowAggregate/Python*OverAggregateJSON-plan tests. Diffs are uniformly "redundantWatermarkAssignerremoved; the affected*ROWTIME*column demoted toTIMESTAMP; adjacent Calcs folded".Verifying this change
This change added tests and can be verified as follows:
This change adds tests and can be verified as follows:
RedundantWatermarkAssignerRemoveRuleTestcovers the rule's positive cases (assigner removed under simple Calc, Calc chain, mixed Union branch, statement set, proctime lookup join, watermark-pushdown source without consumer) and negative cases (assigner kept for window aggregations, rowtime interval join, event-time temporal join, watermark-pushdown source feeding a window, sinks that consume a rowtime column,CURRENT_WATERMARKin a projection or filter, sink rowtime forwarding).TIMESTAMP, two Calcs merged into one.flink-table-plannertest suite runs locally with no failures introduced by this change. Targeted runs over*MiniBatch*,*Watermark*,*Window*,*TemporalJoin*andCalcITCase#testCurrentWatermark*pass.WindowAggregateITCase.testDistinctAggWithMergeOnEventTimeSessionWindow(which exercises a non-struct row type at an intermediate HEP root) passes; this verifies the rule's defensive handling of non-struct HEP-root row types../mvnw spotless:applyand./mvnw checkstyle:check -T1Care clean for the touched modules.Known limitations (documented in the rule's Javadoc)
sink_rt AS rt + INTERVAL '1' SECOND) instead of forwarding the rowtime column directly, the result type is plainTIMESTAMPand not aTimeIndicatorRelDataType. The protected-index propagation cannot trace the rowtime through the composite expression and the assigner may be dropped. The semantics of "watermark forwarding through a derived expression" are not well-defined in Flink today; users that need watermark propagation should project the rowtime column directly.Union, every branch's corresponding rowtime input is protected. This is required for correctness: at runtime the downstream operator advances watermarks as the minimum across union inputs, so dropping the assigner on one branch would block (not skip) progress on that input.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noWatermarkAssigner) from plans that don't need it; existing plans that still need watermarks are unaffected.Documentation
Was generative AI tooling used to co-author this PR?