diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 0f8b5e83ad8fe..8b638bef45c5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -188,7 +188,19 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
def getRelModifiedMonotonicity(rel: Rank, mq: RelMetadataQuery): RelModifiedMonotonicity = {
rel match {
- case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
+ // Only ranks that can be converted to a Deduplicate operator are handled here, mirroring the
+ // pre-FLINK-34702 behavior when a dedicated StreamPhysicalDeduplicate node carried these
+ // invariants. This is the same condition as RankUtil.canConvertToDeduplicate(FlinkLogicalRank):
+ // a Top-1 ROW_NUMBER without rank number output, sorted on a single time attribute. We inline
+ // it rather than calling canConvertToDeduplicate(StreamPhysicalRank) because the latter reads
+ // the ModifyKindSetTrait, which is still undefined at the time this metadata is computed.
+ // Any other rank (multi-column or non-time-attribute ORDER BY, i.e. a real Top-1 Rank that
+ // retracts and re-emits the kept row) falls through to the generic logic below.
+ case physicalRank: StreamPhysicalRank
+ if RankUtil.isDeduplication(rel) &&
+ RankUtil.sortOnTimeAttributeOnly(
+ physicalRank.orderKey,
+ physicalRank.getInput.getRowType) =>
getPhysicalRankModifiedMonotonicity(physicalRank, mq)
case _ =>
@@ -249,8 +261,9 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
private def getPhysicalRankModifiedMonotonicity(
rank: StreamPhysicalRank,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
- // Can't use RankUtil.canConvertToDeduplicate directly because modifyKindSetTrait is undefined.
if (allAppend(mq, rank.getInput)) {
+ // A LastRow (ORDER BY time DESC) or rowtime deduplication retracts and re-emits the kept row
+ // when a new winner arrives, hence generates updates; only FirstRow on proctime is append-only.
if (RankUtil.keepLastDeduplicateRow(rank.orderKey) || rank.sortOnRowTime) {
val mono = new RelModifiedMonotonicity(
Array.fill(rank.getRowType.getFieldCount)(NOT_MONOTONIC))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
index 2314eb5a88fe9..adac0e1d18872 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
@@ -357,9 +357,7 @@ object RankUtil {
!rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType
}
- private def sortOnTimeAttributeOnly(
- sortCollation: RelCollation,
- inputRowType: RelDataType): Boolean = {
+ def sortOnTimeAttributeOnly(sortCollation: RelCollation, inputRowType: RelDataType): Boolean = {
if (sortCollation.getFieldCollations.size() != 1) {
return false
}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index ef7fa02162e0b..f27964e04e95d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -115,6 +115,111 @@ Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
+- GroupAggregate(groupBy=[name, eat], select=[name, eat, SUM(age) AS cnt])
+- Exchange(distribution=[hash[name, eat]])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[name, eat, age])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 2662d3c3139ad..d210e06b43ff2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -934,6 +934,78 @@ class FlinkRelMdHandlerTestBase {
(calcOfFirstRow, calcOfLastRow)
}
+ // A Top-1 ROW_NUMBER whose ORDER BY is NOT a single time attribute. It is logically a
+ // deduplication (RankUtil.isDeduplication), but it cannot be converted to a Deduplicate operator
+ // (RankUtil.canConvertToDeduplicate is false), so it stays a regular Top-1 Rank that retracts and
+ // re-emits the kept row. These guard the dispatch boundary in FlinkRelMdModifiedMonotonicity that
+ // FLINK-34702 introduced.
+ //
+ // equivalent SQL is
+ // select a, b, c from (
+ // select a, b, c, ...,
+ // ROW_NUMBER() over (partition by b order by a) rn from TemporalTable3
+ // ) t where rn <= 1
+ protected lazy val streamTop1RankOnNonTimeAttribute: RelNode = {
+ buildTop1RankNotConvertibleToDeduplicate(RelCollations.of(0))
+ }
+
+ // equivalent SQL is
+ // select a, b, c from (
+ // select a, b, c, ...,
+ // ROW_NUMBER() over (partition by b order by a desc, c desc) rn from TemporalTable3
+ // ) t where rn <= 1
+ protected lazy val streamTop1RankOnMultipleColumns: RelNode = {
+ buildTop1RankNotConvertibleToDeduplicate(
+ RelCollations.of(
+ new RelFieldCollation(
+ 0,
+ RelFieldCollation.Direction.DESCENDING,
+ RelFieldCollation.NullDirection.LAST),
+ new RelFieldCollation(
+ 2,
+ RelFieldCollation.Direction.DESCENDING,
+ RelFieldCollation.NullDirection.LAST)
+ ))
+ }
+
+ def buildTop1RankNotConvertibleToDeduplicate(orderKey: RelCollation): RelNode = {
+ val scan: StreamPhysicalDataStreamScan =
+ createDataStreamScan(ImmutableList.of("TemporalTable3"), streamPhysicalTraits)
+ val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true)
+ val streamExchange =
+ new StreamPhysicalExchange(cluster, scan.getTraitSet.replace(hash1), scan, hash1)
+ val rank = new StreamPhysicalRank(
+ cluster,
+ streamPhysicalTraits,
+ streamExchange,
+ ImmutableBitSet.of(1),
+ orderKey,
+ RankType.ROW_NUMBER,
+ new ConstantRankRange(1, 1),
+ new RelDataTypeFieldImpl("rn", 7, longType),
+ outputRankNumber = false,
+ RankProcessStrategy.UNDEFINED_STRATEGY,
+ sortOnRowTime = false
+ )
+
+ val builder = typeFactory.builder()
+ rank.getRowType.getFieldList.asScala.dropRight(2).foreach(builder.add)
+ val projectProgram = RexProgram.create(
+ rank.getRowType,
+ Array(0, 1, 2).map(i => RexInputRef.of(i, rank.getRowType)).toList.asJava,
+ null,
+ builder.build(),
+ rexBuilder
+ )
+ new StreamPhysicalCalc(
+ cluster,
+ streamPhysicalTraits,
+ rank,
+ projectProgram,
+ projectProgram.getOutputRowType
+ )
+ }
+
protected lazy val streamChangelogNormalize = {
val key = Array(1, 0)
val hash1 = FlinkRelDistribution.hash(key, requireStrict = true)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index 3cc1813bc32e4..4d826e62b5206 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -487,6 +487,24 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
mq.getRelModifiedMonotonicity(streamRowTimeDeduplicateLastRow))
}
+ @Test
+ def testGetRelMonotonicityOnRankNotConvertibleToDeduplicate(): Unit = {
+ // A Top-1 ROW_NUMBER whose ORDER BY is not a single time attribute is logically a
+ // deduplication but cannot be converted to a Deduplicate operator (see
+ // RankUtil.canConvertToDeduplicate). It is a regular Top-1 Rank that retracts and re-emits the
+ // kept row when a new winner arrives, so it must NOT be reported as all-CONSTANT (insert-only).
+ // Instead it falls through to the generic Rank monotonicity logic, which derives the order-by
+ // field from the input monotonicity and the sort direction (CONSTANT input + ASC => DECREASING,
+ // CONSTANT input + DESC => INCREASING). Guards against the FLINK-34702 dispatch regression.
+ assertEquals(
+ new RelModifiedMonotonicity(Array(DECREASING, CONSTANT, NOT_MONOTONIC)),
+ mq.getRelModifiedMonotonicity(streamTop1RankOnNonTimeAttribute))
+
+ assertEquals(
+ new RelModifiedMonotonicity(Array(INCREASING, CONSTANT, NOT_MONOTONIC)),
+ mq.getRelModifiedMonotonicity(streamTop1RankOnMultipleColumns))
+ }
+
@Test
def testGetRelMonotonicityOnChangelogNormalize(): Unit = {
assertEquals(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index a63fbec3ae02c..14db205c4a5c1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -18,12 +18,14 @@
package org.apache.flink.table.planner.plan.stream.sql
import org.apache.flink.table.api._
-import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
import org.apache.flink.table.planner.utils.TableTestBase
import org.assertj.core.api.Assertions.{assertThatExceptionOfType, assertThatThrownBy}
import org.junit.jupiter.api.Test
+import java.time.Duration
+
class RankTest extends TableTestBase {
private val util = streamTestUtil()
@@ -1014,5 +1016,104 @@ class RankTest extends TableTestBase {
util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
}
+ // A minibatch-enabled util with MyTable registered. Minibatch is required for these cases to
+ // exercise the FLINK-34702 fix end-to-end: a deduplication-style Top-1 Rank only forwards its
+ // updates downstream when minibatch is enabled (see RankUtil.outputInsertOnlyInDeduplicate); with
+ // minibatch disabled a FirstRow rank is forced insert-only and the modified-monotonicity guard is
+ // never reached. A fresh util is used (rather than mutating the shared one) to keep minibatch from
+ // leaking into the other test cases.
+ private def miniBatchUtil() = {
+ val mbUtil = streamTestUtil()
+ mbUtil.addDataStream[(Int, String, Long)](
+ "MyTable",
+ 'a,
+ 'b,
+ 'c,
+ 'proctime.proctime,
+ 'rowtime.rowtime)
+ mbUtil.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(1000L))
+ mbUtil
+ }
+
+ /**
+ * A Top-1 ROW_NUMBER (i.e. RankUtil.isDeduplication) whose ORDER BY is a single time attribute in
+ * ascending order can be converted to a FirstRow Deduplicate, which is insert-only. Its modified
+ * monotonicity is CONSTANT, so the downstream MIN aggregate uses the non-retract Min. This is the
+ * insert-only control for the two cases below.
+ */
+ @Test
+ def testDeduplicateOnTimeAttributeIsInsertOnly(): Unit = {
+ val util = miniBatchUtil()
+ val sql =
+ """
+ |SELECT b, MIN(c) AS min_c
+ |FROM (
+ | SELECT a, b, c,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime) AS rn
+ | FROM MyTable
+ |) WHERE rn = 1
+ |GROUP BY b
+ """.stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
+ /**
+ * A Top-1 ROW_NUMBER whose ORDER BY is a single NON-time attribute is a deduplication
+ * (RankUtil.isDeduplication) but can NOT be converted to a Deduplicate operator
+ * (RankUtil.canConvertToDeduplicate is false because it is not sorted on a time attribute). It
+ * stays a Top-1 Rank operator that retracts and re-emits the kept row when a new winner arrives,
+ * hence generates updates. Its modified monotonicity must NOT be CONSTANT, so the downstream MIN
+ * aggregate must use Min_Retract.
+ *
+ * Before the fix, FlinkRelMdModifiedMonotonicity wrongly treated this ascending single-column
+ * case as an insert-only FirstRow and marked it CONSTANT, planning a non-retract Min.
+ */
+ @Test
+ def testDeduplicateOnNonTimeAttributeGeneratesUpdates(): Unit = {
+ val util = miniBatchUtil()
+ val sql =
+ """
+ |SELECT b, MIN(c) AS min_c
+ |FROM (
+ | SELECT a, b, c,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn
+ | FROM MyTable
+ |) WHERE rn = 1
+ |GROUP BY b
+ """.stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
+ /**
+ * Same as above but with a multi-column ORDER BY. A multi-column order key can never be a single
+ * time attribute, so the rank stays a Top-1 Rank operator and generates updates; the downstream
+ * MIN aggregate must use Min_Retract.
+ *
+ * This is the shape that originally exposed the bug: RankUtil.keepLastDeduplicateRow returns
+ * false for a multi-column order key (size != 1), so before the fix it fell through to the
+ * insert-only CONSTANT branch.
+ */
+ @Test
+ def testDeduplicateOnMultipleColumnsGeneratesUpdates(): Unit = {
+ val util = miniBatchUtil()
+ val sql =
+ """
+ |SELECT b, MIN(c) AS min_c
+ |FROM (
+ | SELECT a, b, c,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC, c DESC) AS rn
+ | FROM MyTable
+ |) WHERE rn = 1
+ |GROUP BY b
+ """.stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
// TODO add tests about multi-sinks and udf
}