Skip to content

[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#27371

Closed
fresh-borzoni wants to merge 2 commits intoapache:masterfrom
fresh-borzoni:non-equiv-conditions-upstream-changelog
Closed

[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#27371
fresh-borzoni wants to merge 2 commits intoapache:masterfrom
fresh-borzoni:non-equiv-conditions-upstream-changelog

Conversation

@fresh-borzoni
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni commented Dec 29, 2025

What is the purpose

Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579

This pull request fixes incorrect changelog mode inference when filters or non-equi join conditions are pushed down on non-upsert key columns. Without this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios, leading to phantom rows in the output.

Problem: When a filter like c < 2 (where c is a non-upsert key column) is pushed down to a changelog source, and a row with c=1 is updated to c=2, the old row c=1 matches the filter but the new row c=2 doesn't. The planner was incorrectly allowing DropUpdateBefore optimization, which caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE, downstream operators couldn't retract the old c=1 row, leaving it incorrectly in the result.

Solution: The fix prevents ONLY_UPDATE_AFTER and DELETE_BY_KEY changelog mode when filters or non-equi join conditions reference non-upsert key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction semantics.

Brief change log

  • Added referencesNonUpsertKeyColumns() helper to check if RexNodes reference non-upsert key columns
  • Added hasNonUpsertKeyFilterPushedDown() to detect filters on non-upsert keys in TableSourceScan
  • Added hasNonUpsertKeyNonEquiCondition() to detect non-equi join conditions on non-upsert keys, with precise left/right input analysis
  • Modified SatisfyUpdateKindTraitVisitor to reject ONLY_UPDATE_AFTER for TableSourceScan when filter references non-upsert keys
  • Modified SatisfyUpdateKindTraitVisitor to reject ONLY_UPDATE_AFTER for StreamPhysicalJoin when non-equi condition references non-upsert keys
  • Added comprehensive IT tests covering filter pushdown and join scenarios

Verifying this change

This change added tests and can be verified as follows:

  • Added ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey() - Tests filter pushed down on non-upsert key column with UPDATE_BEFORE preservation (7 parameterized configurations, 6 pass, 1 skipped for incompatible CDC duplicate + MiniBatch)
  • Added ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey() - Tests non-equi join condition on left side non-upsert key column
  • Added ChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey() - Tests non-equi join condition on right side non-upsert key column to validate left/right split logic
  • All tests verify that without the fix, wrong results occur (phantom rows), and with the fix, correct empty results are produced
  • Tests use changelog data with INSERT, UPDATE_BEFORE, UPDATE_AFTER events to simulate real CDC scenarios

Note:

The CDC duplicate + MiniBatch configuration correctly fails to generate an execution plan for filter pushdown with this fix. In this case, the filter is evaluated at the source, where ChangelogNormalize enforces ONLY_UPDATE_AFTER to deduplicate updates, while filtering on a non-upsert key requires UPDATE_BEFORE for correctness. These requirements conflict at the source level, so the planner correctly rejects this configuration.

Join non-equi conditions are evaluated downstream in the join operator, so they do not conflict with ChangelogNormalize.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (only affects plan optimization phase)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no (bug fix)
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Dec 29, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@fresh-borzoni
Copy link
Copy Markdown
Member Author

@xuyangzhong I came across this while reading code related to DeltaJoin and discussions.

Implemented a fix covering the two gaps you identified: pushed-down filters on source and non-equi join conditions.
Would value your review, thank you.

@fresh-borzoni fresh-borzoni force-pushed the non-equiv-conditions-upstream-changelog branch from 26edc66 to 4adbe12 Compare December 29, 2025 23:47
@fresh-borzoni fresh-borzoni force-pushed the non-equiv-conditions-upstream-changelog branch from 4adbe12 to 74f06e1 Compare December 30, 2025 00:06
@fresh-borzoni
Copy link
Copy Markdown
Member Author

@flinkbot run azure

@fresh-borzoni fresh-borzoni changed the title [FLINK-38579] Fix incorrect UB drop for filter on non-upsert key [FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key Jan 1, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 2, 2026

This PR is being marked as stale since it has not had any activity in the last 90 days.
If you would like to keep this PR alive, please leave a comment asking for a review.
If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the
community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it.
If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale label Apr 2, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 2, 2026

This PR has been closed since it has not had any activity in 120 days.
If you feel like this was a mistake, or you would like to continue working on it,
please feel free to re-open the PR and ask for a review.

@fresh-borzoni
Copy link
Copy Markdown
Member Author

fresh-borzoni commented May 2, 2026

Reopened as #28090

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants