Skip to content

[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#28090

Open
fresh-borzoni wants to merge 3 commits intoapache:masterfrom
fresh-borzoni:non-equiv-conditions-upstream-changelog
Open

[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key#28090
fresh-borzoni wants to merge 3 commits intoapache:masterfrom
fresh-borzoni:non-equiv-conditions-upstream-changelog

Conversation

@fresh-borzoni
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni commented May 2, 2026

Reopens #27371 (auto-closed by stale-bot)


What is the purpose

Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579

This pull request fixes incorrect changelog mode inference when filters or non-equi join conditions are pushed down on non-upsert key columns. Without this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios, leading to phantom rows in the output.

Problem: When a filter like c < 2 (where c is a non-upsert key column) is pushed down to a changelog source, and a row with c=1 is updated to c=2, the old row c=1 matches the filter but the new row c=2 doesn't. The planner was incorrectly allowing DropUpdateBefore optimization, which caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE, downstream operators couldn't retract the old c=1 row, leaving it incorrectly in the result.

Solution: The fix prevents ONLY_UPDATE_AFTER and DELETE_BY_KEY changelog mode when filters or non-equi join conditions reference non-upsert key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction semantics.

Brief change log

  • Added referencesNonUpsertKeyColumns() helper to check if RexNodes reference non-upsert key columns
  • Added hasNonUpsertKeyFilterPushedDown() to detect filters on non-upsert keys in TableSourceScan
  • Added hasNonUpsertKeyNonEquiCondition() to detect non-equi join conditions on non-upsert keys, with precise left/right input analysis
  • Modified SatisfyUpdateKindTraitVisitor to reject ONLY_UPDATE_AFTER for TableSourceScan when filter references non-upsert keys
  • Modified SatisfyUpdateKindTraitVisitor to reject ONLY_UPDATE_AFTER for StreamPhysicalJoin when non-equi condition references non-upsert keys
  • Added comprehensive IT tests covering filter pushdown and join scenarios

Verifying this change

This change added tests and can be verified as follows:

  • Added ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey() - Tests filter pushed down on non-upsert key column with UPDATE_BEFORE preservation (7 parameterized configurations, 6 pass, 1 skipped for incompatible CDC duplicate + MiniBatch)
  • Added ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey() - Tests non-equi join condition on left side non-upsert key column
  • Added ChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey() - Tests non-equi join condition on right side non-upsert key column to validate left/right split logic
  • All tests verify that without the fix, wrong results occur (phantom rows), and with the fix, correct empty results are produced
  • Tests use changelog data with INSERT, UPDATE_BEFORE, UPDATE_AFTER events to simulate real CDC scenarios

Note:

The CDC duplicate + MiniBatch configuration correctly fails to generate an execution plan for filter pushdown with this fix. In this case, the filter is evaluated at the source, where ChangelogNormalize enforces ONLY_UPDATE_AFTER to deduplicate updates, while filtering on a non-upsert key requires UPDATE_BEFORE for correctness. These requirements conflict at the source level, so the planner correctly rejects this configuration.

Join non-equi conditions are evaluated downstream in the join operator, so they do not conflict with ChangelogNormalize.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (only affects plan optimization phase)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no (bug fix)
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 2, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@fresh-borzoni
Copy link
Copy Markdown
Member Author

@flinkbot run azure

@fresh-borzoni
Copy link
Copy Markdown
Member Author

@lincoln-lil @xuyangzhong Appreciate if you can take a look 🙏

@wuchong wuchong requested a review from xuyangzhong May 4, 2026 09:22
Copy link
Copy Markdown
Contributor

@xuyangzhong xuyangzhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving this! I have left some comments. PTAL.

}
}

private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: Seq[RexNode]): Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we would better extract a common method from referencesNonUpsertKeyColumns, referencesNonUpsert and isNonUpsertKeyCondition, WDYT?

@@ -1303,27 +1319,35 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(process, Some(children), providedDeleteTrait)

case join: StreamPhysicalJoin =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we also handle StreamPhysicalChangelogNormalize, StreamPhysicalTemporalJoin, StreamPhysicalLookupJoin, StreamPhysicalCorrelateBase, StreamPhysicalIntervalJoin, StreamPhysicalWindowJoin and StreamPhysicalMultiJoin? They contain filters too.

}
}
if (children.exists(_.isEmpty)) {
if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this fix, we can safely delete the logic in DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys, right?

if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch == MiniBatchOn) {
assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t where c < 2"))
.isInstanceOf(classOf[org.apache.flink.table.api.TableException])
.hasMessageContaining("Can't generate a valid execution plan")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should fix this issue introduced by this PR together, or at least ensure it's resolved in the same release.

}

@TestTemplate
def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit testJoinWithNonEquivConditionOnLeftNonUpsertKey

}

@TestTemplate
def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit extract some same logic between testJoinWithNonEquivConditionOnNonUpsertKey and testJoinWithNonEquivConditionOnRightNonUpsertKey

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants