[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#28090
[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#28090fresh-borzoni wants to merge 3 commits intoapache:masterfrom
Conversation
|
@flinkbot run azure |
|
@lincoln-lil @xuyangzhong Appreciate if you can take a look 🙏 |
xuyangzhong
left a comment
There was a problem hiding this comment.
Thanks for driving this! I have left some comments. PTAL.
| } | ||
| } | ||
|
|
||
| private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: Seq[RexNode]): Boolean = { |
There was a problem hiding this comment.
Maybe we would better extract a common method from referencesNonUpsertKeyColumns, referencesNonUpsert and isNonUpsertKeyCondition, WDYT?
| @@ -1303,27 +1319,35 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |||
| createNewNode(process, Some(children), providedDeleteTrait) | |||
|
|
|||
| case join: StreamPhysicalJoin => | |||
There was a problem hiding this comment.
Nit: should we also handle StreamPhysicalChangelogNormalize, StreamPhysicalTemporalJoin, StreamPhysicalLookupJoin, StreamPhysicalCorrelateBase, StreamPhysicalIntervalJoin, StreamPhysicalWindowJoin and StreamPhysicalMultiJoin? They contain filters too.
| } | ||
| } | ||
| if (children.exists(_.isEmpty)) { | ||
| if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) { |
There was a problem hiding this comment.
After this fix, we can safely delete the logic in DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys, right?
| if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch == MiniBatchOn) { | ||
| assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t where c < 2")) | ||
| .isInstanceOf(classOf[org.apache.flink.table.api.TableException]) | ||
| .hasMessageContaining("Can't generate a valid execution plan") |
There was a problem hiding this comment.
Perhaps we should fix this issue introduced by this PR together, or at least ensure it's resolved in the same release.
| } | ||
|
|
||
| @TestTemplate | ||
| def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = { |
There was a problem hiding this comment.
nit testJoinWithNonEquivConditionOnLeftNonUpsertKey
| } | ||
|
|
||
| @TestTemplate | ||
| def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = { |
There was a problem hiding this comment.
nit extract some same logic between testJoinWithNonEquivConditionOnNonUpsertKey and testJoinWithNonEquivConditionOnRightNonUpsertKey
Reopens #27371 (auto-closed by stale-bot)
What is the purpose
Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579
This pull request fixes incorrect changelog mode inference when filters or non-equi join conditions are pushed down on non-upsert key columns. Without this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios, leading to phantom rows in the output.
Problem: When a filter like
c < 2(wherecis a non-upsert key column) is pushed down to a changelog source, and a row withc=1is updated toc=2, the old rowc=1matches the filter but the new rowc=2doesn't. The planner was incorrectly allowingDropUpdateBeforeoptimization, which caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE, downstream operators couldn't retract the oldc=1row, leaving it incorrectly in the result.Solution: The fix prevents
ONLY_UPDATE_AFTERandDELETE_BY_KEYchangelog mode when filters or non-equi join conditions reference non-upsert key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction semantics.Brief change log
referencesNonUpsertKeyColumns()helper to check if RexNodes reference non-upsert key columnshasNonUpsertKeyFilterPushedDown()to detect filters on non-upsert keys in TableSourceScanhasNonUpsertKeyNonEquiCondition()to detect non-equi join conditions on non-upsert keys, with precise left/right input analysisSatisfyUpdateKindTraitVisitorto rejectONLY_UPDATE_AFTERfor TableSourceScan when filter references non-upsert keysSatisfyUpdateKindTraitVisitorto rejectONLY_UPDATE_AFTERfor StreamPhysicalJoin when non-equi condition references non-upsert keysVerifying this change
This change added tests and can be verified as follows:
ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey()- Tests filter pushed down on non-upsert key column with UPDATE_BEFORE preservation (7 parameterized configurations, 6 pass, 1 skipped for incompatible CDC duplicate + MiniBatch)ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey()- Tests non-equi join condition on left side non-upsert key columnChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey()- Tests non-equi join condition on right side non-upsert key column to validate left/right split logicNote:
The CDC duplicate + MiniBatch configuration correctly fails to generate an execution plan for filter pushdown with this fix. In this case, the filter is evaluated at the source, where ChangelogNormalize enforces ONLY_UPDATE_AFTER to deduplicate updates, while filtering on a non-upsert key requires UPDATE_BEFORE for correctness. These requirements conflict at the source level, so the planner correctly rejects this configuration.
Join non-equi conditions are evaluated downstream in the join operator, so they do not conflict with ChangelogNormalize.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation