[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#27371
[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#27371fresh-borzoni wants to merge 2 commits intoapache:masterfrom
Conversation
|
@xuyangzhong I came across this while reading code related to DeltaJoin and discussions. Implemented a fix covering the two gaps you identified: pushed-down filters on source and non-equi join conditions. |
26edc66 to
4adbe12
Compare
4adbe12 to
74f06e1
Compare
|
@flinkbot run azure |
|
This PR is being marked as stale since it has not had any activity in the last 90 days. If you are having difficulty finding a reviewer, please reach out to the If this PR is no longer valid or desired, please feel free to close it. |
|
This PR has been closed since it has not had any activity in 120 days. |
|
Reopened as #28090 |
What is the purpose
Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579
This pull request fixes incorrect changelog mode inference when filters or non-equi join conditions are pushed down on non-upsert key columns. Without this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios, leading to phantom rows in the output.
Problem: When a filter like
c < 2(wherecis a non-upsert key column) is pushed down to a changelog source, and a row withc=1is updated toc=2, the old rowc=1matches the filter but the new rowc=2doesn't. The planner was incorrectly allowingDropUpdateBeforeoptimization, which caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE, downstream operators couldn't retract the oldc=1row, leaving it incorrectly in the result.Solution: The fix prevents
ONLY_UPDATE_AFTERandDELETE_BY_KEYchangelog mode when filters or non-equi join conditions reference non-upsert key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction semantics.Brief change log
referencesNonUpsertKeyColumns()helper to check if RexNodes reference non-upsert key columnshasNonUpsertKeyFilterPushedDown()to detect filters on non-upsert keys in TableSourceScanhasNonUpsertKeyNonEquiCondition()to detect non-equi join conditions on non-upsert keys, with precise left/right input analysisSatisfyUpdateKindTraitVisitorto rejectONLY_UPDATE_AFTERfor TableSourceScan when filter references non-upsert keysSatisfyUpdateKindTraitVisitorto rejectONLY_UPDATE_AFTERfor StreamPhysicalJoin when non-equi condition references non-upsert keysVerifying this change
This change added tests and can be verified as follows:
ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey()- Tests filter pushed down on non-upsert key column with UPDATE_BEFORE preservation (7 parameterized configurations, 6 pass, 1 skipped for incompatible CDC duplicate + MiniBatch)ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey()- Tests non-equi join condition on left side non-upsert key columnChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey()- Tests non-equi join condition on right side non-upsert key column to validate left/right split logicNote:
The CDC duplicate + MiniBatch configuration correctly fails to generate an execution plan for filter pushdown with this fix. In this case, the filter is evaluated at the source, where ChangelogNormalize enforces ONLY_UPDATE_AFTER to deduplicate updates, while filtering on a non-upsert key requires UPDATE_BEFORE for correctness. These requirements conflict at the source level, so the planner correctly rejects this configuration.
Join non-equi conditions are evaluated downstream in the join operator, so they do not conflict with ChangelogNormalize.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation