Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,19 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon

def getRelModifiedMonotonicity(rel: Rank, mq: RelMetadataQuery): RelModifiedMonotonicity = {
rel match {
case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
// Only ranks that can be converted to a Deduplicate operator are handled here, mirroring the
// pre-FLINK-34702 behavior when a dedicated StreamPhysicalDeduplicate node carried these
// invariants. This is the same condition as RankUtil.canConvertToDeduplicate(FlinkLogicalRank):
// a Top-1 ROW_NUMBER without rank number output, sorted on a single time attribute. We inline
// it rather than calling canConvertToDeduplicate(StreamPhysicalRank) because the latter reads
// the ModifyKindSetTrait, which is still undefined at the time this metadata is computed.
// Any other rank (multi-column or non-time-attribute ORDER BY, i.e. a real Top-1 Rank that
// retracts and re-emits the kept row) falls through to the generic logic below.
case physicalRank: StreamPhysicalRank
if RankUtil.isDeduplication(rel) &&
RankUtil.sortOnTimeAttributeOnly(
physicalRank.orderKey,
physicalRank.getInput.getRowType) =>
getPhysicalRankModifiedMonotonicity(physicalRank, mq)

case _ =>
Expand Down Expand Up @@ -249,8 +261,9 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
private def getPhysicalRankModifiedMonotonicity(
rank: StreamPhysicalRank,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
// Can't use RankUtil.canConvertToDeduplicate directly because modifyKindSetTrait is undefined.
if (allAppend(mq, rank.getInput)) {
// A LastRow (ORDER BY time DESC) or rowtime deduplication retracts and re-emits the kept row
// when a new winner arrives, hence generates updates; only FirstRow on proctime is append-only.
if (RankUtil.keepLastDeduplicateRow(rank.orderKey) || rank.sortOnRowTime) {
val mono = new RelModifiedMonotonicity(
Array.fill(rank.getRowType.getFieldCount)(NOT_MONOTONIC))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,7 @@ object RankUtil {
!rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType
}

private def sortOnTimeAttributeOnly(
sortCollation: RelCollation,
inputRowType: RelDataType): Boolean = {
def sortOnTimeAttributeOnly(sortCollation: RelCollation, inputRowType: RelDataType): Boolean = {
if (sortCollation.getFieldCollations.size() != 1) {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,111 @@ Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
+- GroupAggregate(groupBy=[name, eat], select=[name, eat, SUM(age) AS cnt])
+- Exchange(distribution=[hash[name, eat]])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[name, eat, age])
]]>
</Resource>
</TestCase>
<TestCase name="testDeduplicateOnMultipleColumnsGeneratesUpdates">
<Resource name="sql">
<![CDATA[
SELECT b, MIN(c) AS min_c
FROM (
SELECT a, b, c,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC, c DESC) AS rn
FROM MyTable
) WHERE rn = 1
GROUP BY b
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalAggregate(group=[{0}], min_c=[MIN($1)])
+- LogicalProject(b=[$1], c=[$2])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST, $2 DESC NULLS LAST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
GlobalGroupAggregate(groupBy=[b], select=[b, MIN_RETRACT(min$0) AS min_c])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, MIN_RETRACT(c) AS min$0, COUNT_RETRACT(*) AS count1$1])
+- Calc(select=[b, c])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[b DESC, c DESC], select=[a, b, c])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testDeduplicateOnNonTimeAttributeGeneratesUpdates">
<Resource name="sql">
<![CDATA[
SELECT b, MIN(c) AS min_c
FROM (
SELECT a, b, c,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn
FROM MyTable
) WHERE rn = 1
GROUP BY b
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalAggregate(group=[{0}], min_c=[MIN($1)])
+- LogicalProject(b=[$1], c=[$2])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
GlobalGroupAggregate(groupBy=[b], select=[b, MIN_RETRACT(min$0) AS min_c])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, MIN_RETRACT(c) AS min$0, COUNT_RETRACT(*) AS count1$1])
+- Calc(select=[b, c])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[b ASC], select=[a, b, c])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testDeduplicateOnTimeAttributeIsInsertOnly">
<Resource name="sql">
<![CDATA[
SELECT b, MIN(c) AS min_c
FROM (
SELECT a, b, c,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime) AS rn
FROM MyTable
) WHERE rn = 1
GROUP BY b
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalAggregate(group=[{0}], min_c=[MIN($1)])
+- LogicalProject(b=[$1], c=[$2])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
GlobalGroupAggregate(groupBy=[b], select=[b, MIN(min$0) AS min_c])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, MIN(c) AS min$0, COUNT_RETRACT(*) AS count1$1])
+- Calc(select=[b, c])
+- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME], outputInsertOnly=[false])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, proctime])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,78 @@ class FlinkRelMdHandlerTestBase {
(calcOfFirstRow, calcOfLastRow)
}

// A Top-1 ROW_NUMBER whose ORDER BY is NOT a single time attribute. It is logically a
// deduplication (RankUtil.isDeduplication), but it cannot be converted to a Deduplicate operator
// (RankUtil.canConvertToDeduplicate is false), so it stays a regular Top-1 Rank that retracts and
// re-emits the kept row. These guard the dispatch boundary in FlinkRelMdModifiedMonotonicity that
// FLINK-34702 introduced.
//
// equivalent SQL is
// select a, b, c from (
// select a, b, c, ...,
// ROW_NUMBER() over (partition by b order by a) rn from TemporalTable3
// ) t where rn <= 1
protected lazy val streamTop1RankOnNonTimeAttribute: RelNode = {
buildTop1RankNotConvertibleToDeduplicate(RelCollations.of(0))
}

// equivalent SQL is
// select a, b, c from (
// select a, b, c, ...,
// ROW_NUMBER() over (partition by b order by a desc, c desc) rn from TemporalTable3
// ) t where rn <= 1
protected lazy val streamTop1RankOnMultipleColumns: RelNode = {
buildTop1RankNotConvertibleToDeduplicate(
RelCollations.of(
new RelFieldCollation(
0,
RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.LAST),
new RelFieldCollation(
2,
RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.LAST)
))
}

def buildTop1RankNotConvertibleToDeduplicate(orderKey: RelCollation): RelNode = {
val scan: StreamPhysicalDataStreamScan =
createDataStreamScan(ImmutableList.of("TemporalTable3"), streamPhysicalTraits)
val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true)
val streamExchange =
new StreamPhysicalExchange(cluster, scan.getTraitSet.replace(hash1), scan, hash1)
val rank = new StreamPhysicalRank(
cluster,
streamPhysicalTraits,
streamExchange,
ImmutableBitSet.of(1),
orderKey,
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
new RelDataTypeFieldImpl("rn", 7, longType),
outputRankNumber = false,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowTime = false
)

val builder = typeFactory.builder()
rank.getRowType.getFieldList.asScala.dropRight(2).foreach(builder.add)
val projectProgram = RexProgram.create(
rank.getRowType,
Array(0, 1, 2).map(i => RexInputRef.of(i, rank.getRowType)).toList.asJava,
null,
builder.build(),
rexBuilder
)
new StreamPhysicalCalc(
cluster,
streamPhysicalTraits,
rank,
projectProgram,
projectProgram.getOutputRowType
)
}

protected lazy val streamChangelogNormalize = {
val key = Array(1, 0)
val hash1 = FlinkRelDistribution.hash(key, requireStrict = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,24 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
mq.getRelModifiedMonotonicity(streamRowTimeDeduplicateLastRow))
}

@Test
def testGetRelMonotonicityOnRankNotConvertibleToDeduplicate(): Unit = {
// A Top-1 ROW_NUMBER whose ORDER BY is not a single time attribute is logically a
// deduplication but cannot be converted to a Deduplicate operator (see
// RankUtil.canConvertToDeduplicate). It is a regular Top-1 Rank that retracts and re-emits the
// kept row when a new winner arrives, so it must NOT be reported as all-CONSTANT (insert-only).
// Instead it falls through to the generic Rank monotonicity logic, which derives the order-by
// field from the input monotonicity and the sort direction (CONSTANT input + ASC => DECREASING,
// CONSTANT input + DESC => INCREASING). Guards against the FLINK-34702 dispatch regression.
assertEquals(
new RelModifiedMonotonicity(Array(DECREASING, CONSTANT, NOT_MONOTONIC)),
mq.getRelModifiedMonotonicity(streamTop1RankOnNonTimeAttribute))

assertEquals(
new RelModifiedMonotonicity(Array(INCREASING, CONSTANT, NOT_MONOTONIC)),
mq.getRelModifiedMonotonicity(streamTop1RankOnMultipleColumns))
}

@Test
def testGetRelMonotonicityOnChangelogNormalize(): Unit = {
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.flink.table.planner.plan.stream.sql

import org.apache.flink.table.api._
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
import org.apache.flink.table.planner.utils.TableTestBase

import org.assertj.core.api.Assertions.{assertThatExceptionOfType, assertThatThrownBy}
import org.junit.jupiter.api.Test

import java.time.Duration

class RankTest extends TableTestBase {

private val util = streamTestUtil()
Expand Down Expand Up @@ -1014,5 +1016,104 @@ class RankTest extends TableTestBase {
util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
}

// A minibatch-enabled util with MyTable registered. Minibatch is required for these cases to
// exercise the FLINK-34702 fix end-to-end: a deduplication-style Top-1 Rank only forwards its
// updates downstream when minibatch is enabled (see RankUtil.outputInsertOnlyInDeduplicate); with
// minibatch disabled a FirstRow rank is forced insert-only and the modified-monotonicity guard is
// never reached. A fresh util is used (rather than mutating the shared one) to keep minibatch from
// leaking into the other test cases.
private def miniBatchUtil() = {
val mbUtil = streamTestUtil()
mbUtil.addDataStream[(Int, String, Long)](
"MyTable",
'a,
'b,
'c,
'proctime.proctime,
'rowtime.rowtime)
mbUtil.tableEnv.getConfig
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(1000L))
mbUtil
}

/**
* A Top-1 ROW_NUMBER (i.e. RankUtil.isDeduplication) whose ORDER BY is a single time attribute in
* ascending order can be converted to a FirstRow Deduplicate, which is insert-only. Its modified
* monotonicity is CONSTANT, so the downstream MIN aggregate uses the non-retract Min. This is the
* insert-only control for the two cases below.
*/
@Test
def testDeduplicateOnTimeAttributeIsInsertOnly(): Unit = {
val util = miniBatchUtil()
val sql =
"""
|SELECT b, MIN(c) AS min_c
|FROM (
| SELECT a, b, c,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime) AS rn
| FROM MyTable
|) WHERE rn = 1
|GROUP BY b
""".stripMargin

util.verifyExecPlan(sql)
}

/**
* A Top-1 ROW_NUMBER whose ORDER BY is a single NON-time attribute is a deduplication
* (RankUtil.isDeduplication) but can NOT be converted to a Deduplicate operator
* (RankUtil.canConvertToDeduplicate is false because it is not sorted on a time attribute). It
* stays a Top-1 Rank operator that retracts and re-emits the kept row when a new winner arrives,
* hence generates updates. Its modified monotonicity must NOT be CONSTANT, so the downstream MIN
* aggregate must use Min_Retract.
*
* Before the fix, FlinkRelMdModifiedMonotonicity wrongly treated this ascending single-column
* case as an insert-only FirstRow and marked it CONSTANT, planning a non-retract Min.
*/
@Test
def testDeduplicateOnNonTimeAttributeGeneratesUpdates(): Unit = {
val util = miniBatchUtil()
val sql =
"""
|SELECT b, MIN(c) AS min_c
|FROM (
| SELECT a, b, c,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn
| FROM MyTable
|) WHERE rn = 1
|GROUP BY b
""".stripMargin

util.verifyExecPlan(sql)
}

/**
* Same as above but with a multi-column ORDER BY. A multi-column order key can never be a single
* time attribute, so the rank stays a Top-1 Rank operator and generates updates; the downstream
* MIN aggregate must use Min_Retract.
*
* This is the shape that originally exposed the bug: RankUtil.keepLastDeduplicateRow returns
* false for a multi-column order key (size != 1), so before the fix it fell through to the
* insert-only CONSTANT branch.
*/
@Test
def testDeduplicateOnMultipleColumnsGeneratesUpdates(): Unit = {
val util = miniBatchUtil()
val sql =
"""
|SELECT b, MIN(c) AS min_c
|FROM (
| SELECT a, b, c,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC, c DESC) AS rn
| FROM MyTable
|) WHERE rn = 1
|GROUP BY b
""".stripMargin

util.verifyExecPlan(sql)
}

// TODO add tests about multi-sinks and udf
}