Skip to content

Commit 410bd28

Browse files
committed
Keep
1 parent 529e740 commit 410bd28

File tree

4 files changed

+102
-10
lines changed

4 files changed

+102
-10
lines changed

paimon-spark/paimon-spark-4.1/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,3 @@ under the License.
153153
</plugins>
154154
</build>
155155
</project>
156-

paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,3 @@ object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
9090
ScalarSubquery(plan, exprId = exprId)
9191
}
9292
}
93-
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.commands
20+
21+
import org.apache.spark.sql.catalyst.expressions.Expression
22+
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, Keep}
23+
24+
import scala.util.Try
25+
26+
/**
27+
* Helper object for creating MergeRows.Keep instructions compatible with different Spark versions.
28+
*
29+
* Different Spark versions have different Keep signatures:
30+
* - Spark 4.0: Keep(condition, output)
31+
* - Spark 4.1+: Keep(context, condition, output)
32+
*
33+
* This object uses reflection to determine which signature to use.
34+
*/
35+
object KeepHelper {
36+
37+
// Check if Keep has 3-parameter constructor (Spark 4.1+)
38+
private lazy val keepHas3Params: Boolean = {
39+
Try {
40+
classOf[Keep].getConstructors.exists(_.getParameterCount == 3)
41+
}.getOrElse(false)
42+
}
43+
44+
// Context objects for Spark 4.1+
45+
private lazy val updateContext: Option[AnyRef] = getContextObject("Update")
46+
private lazy val copyContext: Option[AnyRef] = getContextObject("Copy")
47+
private lazy val insertContext: Option[AnyRef] = getContextObject("Insert")
48+
49+
private def getContextObject(name: String): Option[AnyRef] = {
50+
Try {
51+
val contextClass =
52+
Class.forName(s"org.apache.spark.sql.catalyst.plans.logical.MergeRows$$$name$$")
53+
contextClass.getField("MODULE$").get(null).asInstanceOf[AnyRef]
54+
}.toOption
55+
}
56+
57+
private def createKeepWithContext(
58+
context: AnyRef,
59+
condition: Expression,
60+
output: Seq[Expression]): Instruction = {
61+
val constructor = classOf[Keep].getConstructors.find(_.getParameterCount == 3).get
62+
constructor.newInstance(context, condition, output).asInstanceOf[Instruction]
63+
}
64+
65+
/** Creates a Keep instruction for Update context (matched rows being updated). */
66+
def createKeepForUpdate(condition: Expression, output: Seq[Expression]): Instruction = {
67+
if (keepHas3Params && updateContext.isDefined) {
68+
createKeepWithContext(updateContext.get, condition, output)
69+
} else {
70+
Keep(condition, output)
71+
}
72+
}
73+
74+
/** Creates a Keep instruction for Copy context (matched rows being copied as-is). */
75+
def createKeepForCopy(condition: Expression, output: Seq[Expression]): Instruction = {
76+
if (keepHas3Params && copyContext.isDefined) {
77+
createKeepWithContext(copyContext.get, condition, output)
78+
} else {
79+
Keep(condition, output)
80+
}
81+
}
82+
83+
/** Creates a Keep instruction for Insert context (not matched rows being inserted). */
84+
def createKeepForInsert(condition: Expression, output: Seq[Expression]): Instruction = {
85+
if (keepHas3Params && insertContext.isDefined) {
86+
createKeepWithContext(insertContext.get, condition, output)
87+
} else {
88+
Keep(condition, output)
89+
}
90+
}
91+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Equ
3636
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
3737
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
3838
import org.apache.spark.sql.catalyst.plans.logical._
39-
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
4039
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
4140
import org.apache.spark.sql.functions.{col, udf}
4241
import org.apache.spark.sql.types.StructType
@@ -322,10 +321,12 @@ case class MergeIntoPaimonDataEvolutionTable(
322321
matchedInstructions = rewrittenUpdateActions
323322
.map(
324323
action => {
325-
Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value))
326-
}) ++ Seq(Keep(TrueLiteral, output)),
324+
KeepHelper.createKeepForUpdate(
325+
action.condition.getOrElse(TrueLiteral),
326+
action.assignments.map(a => a.value))
327+
}) ++ Seq(KeepHelper.createKeepForCopy(TrueLiteral, output)),
327328
notMatchedInstructions = Nil,
328-
notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)),
329+
notMatchedBySourceInstructions = Seq(KeepHelper.createKeepForCopy(TrueLiteral, output)),
329330
checkCardinality = false,
330331
output = output,
331332
child = readPlan
@@ -361,10 +362,12 @@ case class MergeIntoPaimonDataEvolutionTable(
361362
matchedInstructions = realUpdateActions
362363
.map(
363364
action => {
364-
Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value))
365-
}) ++ Seq(Keep(TrueLiteral, output)),
365+
KeepHelper.createKeepForUpdate(
366+
action.condition.getOrElse(TrueLiteral),
367+
action.assignments.map(a => a.value))
368+
}) ++ Seq(KeepHelper.createKeepForCopy(TrueLiteral, output)),
366369
notMatchedInstructions = Nil,
367-
notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)).toSeq,
370+
notMatchedBySourceInstructions = Seq(KeepHelper.createKeepForCopy(TrueLiteral, output)),
368371
checkCardinality = false,
369372
output = output,
370373
child = joinPlan
@@ -399,7 +402,7 @@ case class MergeIntoPaimonDataEvolutionTable(
399402
matchedInstructions = Nil,
400403
notMatchedInstructions = notMatchedActions.map {
401404
case insertAction: InsertAction =>
402-
Keep(
405+
KeepHelper.createKeepForInsert(
403406
insertAction.condition.getOrElse(TrueLiteral),
404407
insertAction.assignments.map(
405408
a =>

0 commit comments

Comments
 (0)