From 61106384f7ccc3188407c8636674fb87a515ab9b Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Mon, 22 Jun 2026 20:08:47 +0800 Subject: [PATCH] [FLINK-39966][table-planner] FlinkRelMdModifiedMonotonicity wrongly reports a non-time-attribute Top-1 Rank as insert-only FLINK-34702 removed the dedicated StreamPhysicalDeduplicate metadata handler and re-routed deduplication monotonicity through StreamPhysicalRank, but the new dispatch guard only checked RankUtil.isDeduplication (Top-1 ROW_NUMBER without rank number). It dropped the sortOnTimeAttributeOnly invariant the old node type implicitly carried. Tighten the guard to isDeduplication && sortOnTimeAttributeOnly restoring the original logic. --- .../FlinkRelMdModifiedMonotonicity.scala | 17 ++- .../table/planner/plan/utils/RankUtil.scala | 4 +- .../planner/plan/stream/sql/RankTest.xml | 105 ++++++++++++++++++ .../metadata/FlinkRelMdHandlerTestBase.scala | 72 ++++++++++++ .../FlinkRelMdModifiedMonotonicityTest.scala | 18 +++ .../planner/plan/stream/sql/RankTest.scala | 103 ++++++++++++++++- 6 files changed, 313 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index 0f8b5e83ad8fe..8b638bef45c5b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -188,7 +188,19 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon def getRelModifiedMonotonicity(rel: Rank, mq: RelMetadataQuery): RelModifiedMonotonicity = { rel match { - case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) => + // Only ranks that can be converted to a Deduplicate operator are handled here, mirroring the + // pre-FLINK-34702 behavior when a dedicated StreamPhysicalDeduplicate node carried these + // invariants. This is the same condition as RankUtil.canConvertToDeduplicate(FlinkLogicalRank): + // a Top-1 ROW_NUMBER without rank number output, sorted on a single time attribute. We inline + // it rather than calling canConvertToDeduplicate(StreamPhysicalRank) because the latter reads + // the ModifyKindSetTrait, which is still undefined at the time this metadata is computed. + // Any other rank (multi-column or non-time-attribute ORDER BY, i.e. a real Top-1 Rank that + // retracts and re-emits the kept row) falls through to the generic logic below. + case physicalRank: StreamPhysicalRank + if RankUtil.isDeduplication(rel) && + RankUtil.sortOnTimeAttributeOnly( + physicalRank.orderKey, + physicalRank.getInput.getRowType) => getPhysicalRankModifiedMonotonicity(physicalRank, mq) case _ => @@ -249,8 +261,9 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon private def getPhysicalRankModifiedMonotonicity( rank: StreamPhysicalRank, mq: RelMetadataQuery): RelModifiedMonotonicity = { - // Can't use RankUtil.canConvertToDeduplicate directly because modifyKindSetTrait is undefined. if (allAppend(mq, rank.getInput)) { + // A LastRow (ORDER BY time DESC) or rowtime deduplication retracts and re-emits the kept row + // when a new winner arrives, hence generates updates; only FirstRow on proctime is append-only. if (RankUtil.keepLastDeduplicateRow(rank.orderKey) || rank.sortOnRowTime) { val mono = new RelModifiedMonotonicity( Array.fill(rank.getRowType.getFieldCount)(NOT_MONOTONIC)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala index 2314eb5a88fe9..adac0e1d18872 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala @@ -357,9 +357,7 @@ object RankUtil { !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType } - private def sortOnTimeAttributeOnly( - sortCollation: RelCollation, - inputRowType: RelDataType): Boolean = { + def sortOnTimeAttributeOnly(sortCollation: RelCollation, inputRowType: RelDataType): Boolean = { if (sortCollation.getFieldCollations.size() != 1) { return false } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml index ef7fa02162e0b..f27964e04e95d 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml @@ -115,6 +115,111 @@ Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt]) +- GroupAggregate(groupBy=[name, eat], select=[name, eat, SUM(age) AS cnt]) +- Exchange(distribution=[hash[name, eat]]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[name, eat, age]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 2662d3c3139ad..d210e06b43ff2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -934,6 +934,78 @@ class FlinkRelMdHandlerTestBase { (calcOfFirstRow, calcOfLastRow) } + // A Top-1 ROW_NUMBER whose ORDER BY is NOT a single time attribute. It is logically a + // deduplication (RankUtil.isDeduplication), but it cannot be converted to a Deduplicate operator + // (RankUtil.canConvertToDeduplicate is false), so it stays a regular Top-1 Rank that retracts and + // re-emits the kept row. These guard the dispatch boundary in FlinkRelMdModifiedMonotonicity that + // FLINK-34702 introduced. + // + // equivalent SQL is + // select a, b, c from ( + // select a, b, c, ..., + // ROW_NUMBER() over (partition by b order by a) rn from TemporalTable3 + // ) t where rn <= 1 + protected lazy val streamTop1RankOnNonTimeAttribute: RelNode = { + buildTop1RankNotConvertibleToDeduplicate(RelCollations.of(0)) + } + + // equivalent SQL is + // select a, b, c from ( + // select a, b, c, ..., + // ROW_NUMBER() over (partition by b order by a desc, c desc) rn from TemporalTable3 + // ) t where rn <= 1 + protected lazy val streamTop1RankOnMultipleColumns: RelNode = { + buildTop1RankNotConvertibleToDeduplicate( + RelCollations.of( + new RelFieldCollation( + 0, + RelFieldCollation.Direction.DESCENDING, + RelFieldCollation.NullDirection.LAST), + new RelFieldCollation( + 2, + RelFieldCollation.Direction.DESCENDING, + RelFieldCollation.NullDirection.LAST) + )) + } + + def buildTop1RankNotConvertibleToDeduplicate(orderKey: RelCollation): RelNode = { + val scan: StreamPhysicalDataStreamScan = + createDataStreamScan(ImmutableList.of("TemporalTable3"), streamPhysicalTraits) + val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true) + val streamExchange = + new StreamPhysicalExchange(cluster, scan.getTraitSet.replace(hash1), scan, hash1) + val rank = new StreamPhysicalRank( + cluster, + streamPhysicalTraits, + streamExchange, + ImmutableBitSet.of(1), + orderKey, + RankType.ROW_NUMBER, + new ConstantRankRange(1, 1), + new RelDataTypeFieldImpl("rn", 7, longType), + outputRankNumber = false, + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowTime = false + ) + + val builder = typeFactory.builder() + rank.getRowType.getFieldList.asScala.dropRight(2).foreach(builder.add) + val projectProgram = RexProgram.create( + rank.getRowType, + Array(0, 1, 2).map(i => RexInputRef.of(i, rank.getRowType)).toList.asJava, + null, + builder.build(), + rexBuilder + ) + new StreamPhysicalCalc( + cluster, + streamPhysicalTraits, + rank, + projectProgram, + projectProgram.getOutputRowType + ) + } + protected lazy val streamChangelogNormalize = { val key = Array(1, 0) val hash1 = FlinkRelDistribution.hash(key, requireStrict = true) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala index 3cc1813bc32e4..4d826e62b5206 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala @@ -487,6 +487,24 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase { mq.getRelModifiedMonotonicity(streamRowTimeDeduplicateLastRow)) } + @Test + def testGetRelMonotonicityOnRankNotConvertibleToDeduplicate(): Unit = { + // A Top-1 ROW_NUMBER whose ORDER BY is not a single time attribute is logically a + // deduplication but cannot be converted to a Deduplicate operator (see + // RankUtil.canConvertToDeduplicate). It is a regular Top-1 Rank that retracts and re-emits the + // kept row when a new winner arrives, so it must NOT be reported as all-CONSTANT (insert-only). + // Instead it falls through to the generic Rank monotonicity logic, which derives the order-by + // field from the input monotonicity and the sort direction (CONSTANT input + ASC => DECREASING, + // CONSTANT input + DESC => INCREASING). Guards against the FLINK-34702 dispatch regression. + assertEquals( + new RelModifiedMonotonicity(Array(DECREASING, CONSTANT, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(streamTop1RankOnNonTimeAttribute)) + + assertEquals( + new RelModifiedMonotonicity(Array(INCREASING, CONSTANT, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(streamTop1RankOnMultipleColumns)) + } + @Test def testGetRelMonotonicityOnChangelogNormalize(): Unit = { assertEquals( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala index a63fbec3ae02c..14db205c4a5c1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.planner.plan.stream.sql import org.apache.flink.table.api._ -import org.apache.flink.table.api.config.OptimizerConfigOptions +import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions} import org.apache.flink.table.planner.utils.TableTestBase import org.assertj.core.api.Assertions.{assertThatExceptionOfType, assertThatThrownBy} import org.junit.jupiter.api.Test +import java.time.Duration + class RankTest extends TableTestBase { private val util = streamTestUtil() @@ -1014,5 +1016,104 @@ class RankTest extends TableTestBase { util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE) } + // A minibatch-enabled util with MyTable registered. Minibatch is required for these cases to + // exercise the FLINK-34702 fix end-to-end: a deduplication-style Top-1 Rank only forwards its + // updates downstream when minibatch is enabled (see RankUtil.outputInsertOnlyInDeduplicate); with + // minibatch disabled a FirstRow rank is forced insert-only and the modified-monotonicity guard is + // never reached. A fresh util is used (rather than mutating the shared one) to keep minibatch from + // leaking into the other test cases. + private def miniBatchUtil() = { + val mbUtil = streamTestUtil() + mbUtil.addDataStream[(Int, String, Long)]( + "MyTable", + 'a, + 'b, + 'c, + 'proctime.proctime, + 'rowtime.rowtime) + mbUtil.tableEnv.getConfig + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(1000L)) + mbUtil + } + + /** + * A Top-1 ROW_NUMBER (i.e. RankUtil.isDeduplication) whose ORDER BY is a single time attribute in + * ascending order can be converted to a FirstRow Deduplicate, which is insert-only. Its modified + * monotonicity is CONSTANT, so the downstream MIN aggregate uses the non-retract Min. This is the + * insert-only control for the two cases below. + */ + @Test + def testDeduplicateOnTimeAttributeIsInsertOnly(): Unit = { + val util = miniBatchUtil() + val sql = + """ + |SELECT b, MIN(c) AS min_c + |FROM ( + | SELECT a, b, c, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime) AS rn + | FROM MyTable + |) WHERE rn = 1 + |GROUP BY b + """.stripMargin + + util.verifyExecPlan(sql) + } + + /** + * A Top-1 ROW_NUMBER whose ORDER BY is a single NON-time attribute is a deduplication + * (RankUtil.isDeduplication) but can NOT be converted to a Deduplicate operator + * (RankUtil.canConvertToDeduplicate is false because it is not sorted on a time attribute). It + * stays a Top-1 Rank operator that retracts and re-emits the kept row when a new winner arrives, + * hence generates updates. Its modified monotonicity must NOT be CONSTANT, so the downstream MIN + * aggregate must use Min_Retract. + * + * Before the fix, FlinkRelMdModifiedMonotonicity wrongly treated this ascending single-column + * case as an insert-only FirstRow and marked it CONSTANT, planning a non-retract Min. + */ + @Test + def testDeduplicateOnNonTimeAttributeGeneratesUpdates(): Unit = { + val util = miniBatchUtil() + val sql = + """ + |SELECT b, MIN(c) AS min_c + |FROM ( + | SELECT a, b, c, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn + | FROM MyTable + |) WHERE rn = 1 + |GROUP BY b + """.stripMargin + + util.verifyExecPlan(sql) + } + + /** + * Same as above but with a multi-column ORDER BY. A multi-column order key can never be a single + * time attribute, so the rank stays a Top-1 Rank operator and generates updates; the downstream + * MIN aggregate must use Min_Retract. + * + * This is the shape that originally exposed the bug: RankUtil.keepLastDeduplicateRow returns + * false for a multi-column order key (size != 1), so before the fix it fell through to the + * insert-only CONSTANT branch. + */ + @Test + def testDeduplicateOnMultipleColumnsGeneratesUpdates(): Unit = { + val util = miniBatchUtil() + val sql = + """ + |SELECT b, MIN(c) AS min_c + |FROM ( + | SELECT a, b, c, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC, c DESC) AS rn + | FROM MyTable + |) WHERE rn = 1 + |GROUP BY b + """.stripMargin + + util.verifyExecPlan(sql) + } + // TODO add tests about multi-sinks and udf }