-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key #28090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction | |
| import org.apache.flink.table.planner.plan.`trait`._ | ||
| import org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, fullDeleteOrNone, DELETE_BY_KEY} | ||
| import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER} | ||
| import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec | ||
| import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery | ||
| import org.apache.flink.table.planner.plan.nodes.physical.stream._ | ||
| import org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver | ||
|
|
@@ -46,9 +47,11 @@ import org.apache.flink.types.RowKind | |
| import org.apache.calcite.linq4j.Ord | ||
| import org.apache.calcite.rel.RelNode | ||
| import org.apache.calcite.rel.core.JoinRelType | ||
| import org.apache.calcite.rex.RexCall | ||
| import org.apache.calcite.rex.{RexCall, RexNode} | ||
| import org.apache.calcite.util.ImmutableBitSet | ||
|
|
||
| import java.util.Collections | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
|
|
||
| /** An optimize program to infer ChangelogMode for every physical node. */ | ||
|
|
@@ -659,26 +662,31 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |
|
|
||
| case join: StreamPhysicalJoin => | ||
| val onlyAfterByParent = requiredUpdateTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER | ||
| val children = join.getInputs.zipWithIndex.map { | ||
| case (child, childOrdinal) => | ||
| val physicalChild = child.asInstanceOf[StreamPhysicalRel] | ||
| val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal) | ||
| val inputModifyKindSet = getModifyKindSet(physicalChild) | ||
| if (onlyAfterByParent) { | ||
| if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) { | ||
| // the parent requires only-after, however, the join doesn't support this | ||
| None | ||
| } else { | ||
| this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet)) | ||
| } | ||
| } else { | ||
| this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet)) | ||
| } | ||
| } | ||
| if (children.exists(_.isEmpty)) { | ||
| if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) { | ||
| // FLINK-38579: non-equi condition on non-upsert key requires UPDATE_BEFORE | ||
| None | ||
| } else { | ||
| createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait) | ||
| val children = join.getInputs.zipWithIndex.map { | ||
| case (child, childOrdinal) => | ||
| val physicalChild = child.asInstanceOf[StreamPhysicalRel] | ||
| val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal) | ||
| val inputModifyKindSet = getModifyKindSet(physicalChild) | ||
| if (onlyAfterByParent) { | ||
| if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) { | ||
| // the parent requires only-after, however, the join doesn't support this | ||
| None | ||
| } else { | ||
| this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet)) | ||
| } | ||
| } else { | ||
| this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet)) | ||
| } | ||
| } | ||
| if (children.exists(_.isEmpty)) { | ||
| None | ||
| } else { | ||
| createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait) | ||
| } | ||
| } | ||
|
|
||
| case temporalJoin: StreamPhysicalTemporalJoin => | ||
|
|
@@ -799,16 +807,24 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |
| case ts: StreamPhysicalTableSourceScan => | ||
| // currently only support BEFORE_AND_AFTER if source produces updates | ||
| val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode) | ||
| val newSource = createNewNode(rel, Some(List()), providedTrait) | ||
| if ( | ||
| providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) && | ||
| requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER) | ||
| requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER && | ||
| hasNonUpsertKeyFilterPushedDown(ts) | ||
| ) { | ||
| // requiring only-after, but the source is CDC source, then drop update_before manually | ||
| val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel) | ||
| createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait) | ||
| // FLINK-38579: filter on non-upsert key requires UPDATE_BEFORE | ||
| None | ||
| } else { | ||
| newSource | ||
| val newSource = createNewNode(rel, Some(List()), providedTrait) | ||
| if ( | ||
| providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) && | ||
| requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER) | ||
| ) { | ||
| // requiring only-after, but the source is CDC source, then drop update_before manually | ||
| val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel) | ||
| createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait) | ||
| } else { | ||
| newSource | ||
| } | ||
| } | ||
|
|
||
| case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan | | ||
|
|
@@ -1303,27 +1319,35 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |
| createNewNode(process, Some(children), providedDeleteTrait) | ||
|
|
||
| case join: StreamPhysicalJoin => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: should we also handle |
||
| val children = join.getInputs.zipWithIndex.map { | ||
| case (child, childOrdinal) => | ||
| val physicalChild = child.asInstanceOf[StreamPhysicalRel] | ||
| val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal) | ||
| val inputModifyKindSet = getModifyKindSet(physicalChild) | ||
| if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) { | ||
| this | ||
| .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet)) | ||
| .orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))) | ||
| } else { | ||
| this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)) | ||
| } | ||
| } | ||
| if (children.exists(_.isEmpty)) { | ||
| if ( | ||
| requiredTrait == DeleteKindTrait.DELETE_BY_KEY && | ||
| hasNonUpsertKeyNonEquiCondition(join) | ||
| ) { | ||
| // FLINK-38579: non-equi condition on non-upsert key requires full DELETE | ||
| None | ||
| } else { | ||
| val childRels = children.flatten.toList | ||
| if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) { | ||
| createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel))) | ||
| val children = join.getInputs.zipWithIndex.map { | ||
| case (child, childOrdinal) => | ||
| val physicalChild = child.asInstanceOf[StreamPhysicalRel] | ||
| val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal) | ||
| val inputModifyKindSet = getModifyKindSet(physicalChild) | ||
| if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) { | ||
| this | ||
| .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet)) | ||
| .orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))) | ||
| } else { | ||
| this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)) | ||
| } | ||
| } | ||
| if (children.exists(_.isEmpty)) { | ||
| None | ||
| } else { | ||
| createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel))) | ||
| val childRels = children.flatten.toList | ||
| if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) { | ||
| createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel))) | ||
| } else { | ||
| createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1432,7 +1456,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |
| case ts: StreamPhysicalTableSourceScan => | ||
| // currently only support BEFORE_AND_AFTER if source produces updates | ||
| val providedTrait = DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode) | ||
| createNewNode(rel, Some(List()), providedTrait) | ||
| if ( | ||
| requiredTrait == DeleteKindTrait.DELETE_BY_KEY && | ||
| hasNonUpsertKeyFilterPushedDown(ts) | ||
| ) { | ||
| // FLINK-38579: filter on non-upsert key requires full DELETE | ||
| None | ||
| } else { | ||
| createNewNode(rel, Some(List()), providedTrait) | ||
| } | ||
|
|
||
| case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan | | ||
| _: StreamPhysicalValues => | ||
|
|
@@ -1599,6 +1631,90 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |
| } | ||
| } | ||
|
|
||
| private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: Seq[RexNode]): Boolean = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we would better extract a common method from |
||
| if (rexNodes.isEmpty) { | ||
| return false | ||
| } | ||
|
|
||
| val fmq = FlinkRelMetadataQuery.reuseOrCreate(node.getCluster.getMetadataQuery) | ||
| val upsertKeys = fmq.getUpsertKeys(node) | ||
|
|
||
| if (upsertKeys == null || upsertKeys.isEmpty) { | ||
| return true | ||
| } | ||
|
|
||
| val fieldRefIndices = ImmutableBitSet.of( | ||
| RexNodeExtractor.extractRefInputFields(JavaScalaConversionUtil.toJava(rexNodes)): _*) | ||
|
|
||
| !upsertKeys.exists(upsertKey => upsertKey.contains(fieldRefIndices)) | ||
| } | ||
|
|
||
| private def hasNonUpsertKeyFilterPushedDown(ts: StreamPhysicalTableSourceScan): Boolean = { | ||
| val tableSourceTable = ts.getTable.unwrap(classOf[TableSourceTable]) | ||
| if (tableSourceTable == null) { | ||
| return false | ||
| } | ||
|
|
||
| val filterSpec = tableSourceTable.abilitySpecs | ||
| .collectFirst { case spec: FilterPushDownSpec => spec } | ||
|
|
||
| filterSpec match { | ||
| case Some(spec) => | ||
| val predicates = JavaScalaConversionUtil.toScala(spec.getPredicates) | ||
| referencesNonUpsertKeyColumns(ts, predicates) | ||
| case None => false | ||
| } | ||
| } | ||
|
|
||
| private def hasNonUpsertKeyNonEquiCondition(join: StreamPhysicalJoin): Boolean = { | ||
| val nonEquiCondOpt = join.joinSpec.getNonEquiCondition | ||
| if (!nonEquiCondOpt.isPresent) { | ||
| return false | ||
| } | ||
|
|
||
| // Only block UB drop if inputs actually produce UPDATE messages | ||
| // For batch sources with changelogMode=[I], there are no updates, so it's safe to allow UB drop | ||
| val leftModifyKindSet = getModifyKindSet(join.getLeft) | ||
| val rightModifyKindSet = getModifyKindSet(join.getRight) | ||
| if ( | ||
| !leftModifyKindSet.contains(ModifyKind.UPDATE) && | ||
| !rightModifyKindSet.contains(ModifyKind.UPDATE) | ||
| ) { | ||
| return false // No updates produced, safe to allow UB drop | ||
| } | ||
|
|
||
| val condition = nonEquiCondOpt.get() | ||
| val referencedFields = ImmutableBitSet.of( | ||
| RexNodeExtractor.extractRefInputFields(Collections.singletonList(condition)): _*) | ||
|
|
||
| if (referencedFields.isEmpty) { | ||
| return false | ||
| } | ||
|
|
||
| val fmq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery) | ||
| val leftFieldCount = join.getLeft.getRowType.getFieldCount | ||
|
|
||
| val leftBuilder = ImmutableBitSet.builder() | ||
| val rightBuilder = ImmutableBitSet.builder() | ||
| referencedFields.foreach { | ||
| idx => | ||
| if (idx < leftFieldCount) leftBuilder.set(idx) | ||
| else rightBuilder.set(idx - leftFieldCount) | ||
| } | ||
|
|
||
| val leftRefs = leftBuilder.build() | ||
| val rightRefs = rightBuilder.build() | ||
|
|
||
| def referencesNonUpsert(input: RelNode, fields: ImmutableBitSet): Boolean = { | ||
| if (fields.isEmpty) return false | ||
| val upsertKeys = fmq.getUpsertKeys(input) | ||
| upsertKeys == null || upsertKeys.isEmpty || !upsertKeys.exists(_.contains(fields)) | ||
| } | ||
|
|
||
| referencesNonUpsert(join.getLeft, leftRefs) || | ||
| referencesNonUpsert(join.getRight, rightRefs) | ||
| } | ||
|
|
||
| private def getModifyKindSet(node: RelNode): ModifyKindSet = { | ||
| val modifyKindSetTrait = node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) | ||
| modifyKindSetTrait.modifyKindSet | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this fix, we can safely delete the logic in
DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys, right?