diff --git a/.github/workflows/utitcase-spark-4.x.yml b/.github/workflows/utitcase-spark-4.x.yml
index 5b877803068b..024dcc67ebb0 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -61,10 +61,10 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
- for suffix in ut 4.0; do
+ for suffix in ut 4.1 4.0; do
test_modules+="org.apache.paimon:paimon-spark-${suffix}_2.13,"
done
test_modules="${test_modules%,}"
mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1
env:
- MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
+ MAVEN_OPTS: -Xmx4096m
diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala
new file mode 100644
index 000000000000..d8ba2847ab88
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.data
+
+import org.apache.paimon.types.DataType
+
+import org.apache.spark.unsafe.types.VariantVal
+
+class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkArrayData {
+
+ override def getVariant(ordinal: Int): VariantVal = {
+ val v = paimonArray.getVariant(ordinal)
+ new VariantVal(v.value(), v.metadata())
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala
new file mode 100644
index 000000000000..9ac2766346f9
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.data
+
+import org.apache.paimon.spark.AbstractSparkInternalRow
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.unsafe.types.VariantVal
+
+class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowType) {
+
+ override def getVariant(i: Int): VariantVal = {
+ val v = row.getVariant(i)
+ new VariantVal(v.value(), v.metadata())
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
new file mode 100644
index 000000000000..c52207e43197
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.data
+
+import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.InternalRowUtils.copyInternalRow
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int, blobAsDescriptor: Boolean)
+ extends Spark4InternalRow(rowType) {
+
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ if (ordinal == blobFieldIndex) {
+ if (blobAsDescriptor) {
+ row.getBlob(ordinal).toDescriptor.serialize()
+ } else {
+ row.getBlob(ordinal).toData
+ }
+ } else {
+ super.getBinary(ordinal)
+ }
+ }
+
+ override def copy: InternalRow =
+ SparkInternalRow.create(rowType, blobAsDescriptor).replace(copyInternalRow(row, rowType))
+}
diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala
new file mode 100644
index 000000000000..2c3ad0c659ec
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon.shims
+
+import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.types.{DataType, RowType}
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+object MinorVersionShim {
+
+ def createKeep(context: String, condition: Expression, output: Seq[Expression]): Instruction = {
+ MergeRows.Keep(condition, output)
+ }
+
+ def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
+ new Spark4InternalRow(rowType)
+ }
+
+ def createSparkInternalRowWithBlob(
+ rowType: RowType,
+ blobFieldIndex: Int,
+ blobAsDescriptor: Boolean): SparkInternalRow = {
+ new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+ }
+
+ def createSparkArrayData(elementType: DataType): SparkArrayData = {
+ new Spark4ArrayData(elementType)
+ }
+
+ def createFileIndex(
+ options: CaseInsensitiveStringMap,
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ partitionSchema: StructType): PartitioningAwareFileIndex = {
+
+ class PartitionedMetadataLogFileIndex(
+ sparkSession: SparkSession,
+ path: Path,
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ override val partitionSchema: StructType)
+ extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema)
+
+ class PartitionedInMemoryFileIndex(
+ sparkSession: SparkSession,
+ rootPathsSpecified: Seq[Path],
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache,
+ userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+ metadataOpsTimeNs: Option[Long] = None,
+ override val partitionSchema: StructType)
+ extends InMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ parameters,
+ userSpecifiedSchema,
+ fileStatusCache,
+ userSpecifiedPartitionSpec,
+ metadataOpsTimeNs)
+
+ def globPaths: Boolean = {
+ val entry = options.get(DataSource.GLOB_PATHS_KEY)
+ Option(entry).forall(_ == "true")
+ }
+
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
+ new PartitionedMetadataLogFileIndex(
+ sparkSession,
+ new Path(paths.head),
+ options.asScala.toMap,
+ userSpecifiedSchema,
+ partitionSchema = partitionSchema)
+ } else {
+ val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+ paths,
+ hadoopConf,
+ checkEmptyGlobPath = true,
+ checkFilesExist = true,
+ enableGlobbing = globPaths)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+ new PartitionedInMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ caseSensitiveMap,
+ userSpecifiedSchema,
+ fileStatusCache,
+ partitionSchema = partitionSchema)
+ }
+ }
+
+}
diff --git a/paimon-spark/paimon-spark-4.1/pom.xml b/paimon-spark/paimon-spark-4.1/pom.xml
new file mode 100644
index 000000000000..21e7143463cd
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/pom.xml
@@ -0,0 +1,156 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.paimon
+ paimon-spark
+ 1.4-SNAPSHOT
+
+
+ paimon-spark-4.1_2.13
+ Paimon : Spark : 4.1 : 2.13
+
+
+ 4.1.0
+
+
+
+
+ org.apache.paimon
+ paimon-format
+
+
+
+ org.apache.paimon
+ paimon-spark4-common_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.paimon
+ paimon-spark-common_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+
+
+
+
+
+ org.apache.paimon
+ paimon-spark-ut_${scala.binary.version}
+ ${project.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+ org.apache.spark
+ spark-connect-shims_${scala.binary.version}
+
+
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-paimon
+ package
+
+ shade
+
+
+
+
+ *
+
+ com/github/luben/zstd/**
+ **/*libzstd-jni-*.so
+ **/*libzstd-jni-*.dll
+
+
+
+
+
+ org.apache.paimon:paimon-spark4-common_${scala.binary.version}
+
+
+
+
+
+
+
+
+
+
diff --git a/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
new file mode 100644
index 000000000000..e86195f1af0b
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.optimizer
+
+import org.apache.paimon.spark.PaimonScan
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, ExprId, ScalarSubquery, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
+
+ override def tryMergeDataSourceV2ScanRelation(
+ newV2ScanRelation: DataSourceV2ScanRelation,
+ cachedV2ScanRelation: DataSourceV2ScanRelation)
+ : Option[(LogicalPlan, AttributeMap[Attribute])] = {
+ (newV2ScanRelation, cachedV2ScanRelation) match {
+ case (
+ DataSourceV2ScanRelation(
+ newRelation,
+ newScan: PaimonScan,
+ newOutput,
+ newPartitioning,
+ newOrdering),
+ DataSourceV2ScanRelation(
+ cachedRelation,
+ cachedScan: PaimonScan,
+ _,
+ cachedPartitioning,
+ cacheOrdering)) =>
+ checkIdenticalPlans(newRelation, cachedRelation).flatMap {
+ outputMap =>
+ if (
+ samePartitioning(newPartitioning, cachedPartitioning, outputMap) && sameOrdering(
+ newOrdering,
+ cacheOrdering,
+ outputMap)
+ ) {
+ mergePaimonScan(newScan, cachedScan).map {
+ mergedScan =>
+ val mergedAttributes = mergedScan
+ .readSchema()
+ .map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+ val cachedOutputNameMap = cachedRelation.output.map(a => a.name -> a).toMap
+ val mergedOutput =
+ mergedAttributes.map(a => cachedOutputNameMap.getOrElse(a.name, a))
+ val newV2ScanRelation =
+ cachedV2ScanRelation.copy(scan = mergedScan, output = mergedOutput)
+
+ val mergedOutputNameMap = mergedOutput.map(a => a.name -> a).toMap
+ val newOutputMap =
+ AttributeMap(newOutput.map(a => a -> mergedOutputNameMap(a.name).toAttribute))
+
+ newV2ScanRelation -> newOutputMap
+ }
+ } else {
+ None
+ }
+ }
+
+ case _ => None
+ }
+ }
+
+ private def sameOrdering(
+ newOrdering: Option[Seq[SortOrder]],
+ cachedOrdering: Option[Seq[SortOrder]],
+ outputAttrMap: AttributeMap[Attribute]): Boolean = {
+ val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputAttrMap)))
+ mappedNewOrdering.map(_.map(_.canonicalized)) == cachedOrdering.map(_.map(_.canonicalized))
+ }
+
+ override protected def createScalarSubquery(plan: LogicalPlan, exprId: ExprId): ScalarSubquery = {
+ ScalarSubquery(plan, exprId = exprId)
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar b/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 000000000000..a5bfa456f668
Binary files /dev/null and b/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar differ
diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml
new file mode 100644
index 000000000000..bdf2bb090760
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml
@@ -0,0 +1,56 @@
+
+
+
+
+ hive.metastore.integral.jdo.pushdown
+ true
+
+
+
+ hive.metastore.schema.verification
+ false
+
+
+
+ hive.metastore.client.capability.check
+ false
+
+
+
+ datanucleus.schema.autoCreateTables
+ true
+
+
+
+ datanucleus.schema.autoCreateAll
+ true
+
+
+
+
+ datanucleus.connectionPoolingType
+ DBCP
+
+
+
+ hive.metastore.uris
+ thrift://localhost:9090
+ Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
+
+
\ No newline at end of file
diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties b/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000000..6f324f5863ac
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
new file mode 100644
index 000000000000..322d50a62127
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+class CompactProcedureTest extends CompactProcedureTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala
new file mode 100644
index 000000000000..d57846709877
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+class ProcedureTest extends ProcedureTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
new file mode 100644
index 000000000000..255906d04bf2
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class AnalyzeTableTest extends AnalyzeTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
new file mode 100644
index 000000000000..b729f57b33e7
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DDLTest extends DDLTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala
new file mode 100644
index 000000000000..cb139d2a57be
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase {}
+
+class DefaultDatabaseTest extends DefaultDatabaseTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
new file mode 100644
index 000000000000..6170e2fd6c5c
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DataFrameWriteTest extends DataFrameWriteTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
new file mode 100644
index 000000000000..ab33a40e5966
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.SparkConf
+
+class DeleteFromTableTest extends DeleteFromTableTestBase {}
+
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
new file mode 100644
index 000000000000..c6aa77419241
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DescribeTableTest extends DescribeTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 000000000000..ba49976ab6c0
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala
new file mode 100644
index 000000000000..4f66584c303b
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class InsertOverwriteTableTest extends InsertOverwriteTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
new file mode 100644
index 000000000000..b9a85b147eea
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}
+
+class MergeIntoPrimaryKeyBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoPrimaryKeyTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonPrimaryKeyBucketedTableTest {}
+
+class MergeIntoPrimaryKeyNonBucketTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoPrimaryKeyTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonPrimaryKeyNonBucketTableTest {}
+
+class MergeIntoAppendBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonAppendBucketedTableTest {}
+
+class MergeIntoAppendNonBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonAppendNonBucketTableTest {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000000..635185a9ed0e
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends PaimonCompositePartitionKeyTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
new file mode 100644
index 000000000000..ec140a89bbd3
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, NamedExpression, ScalarSubquery}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+class PaimonOptimizationTest extends PaimonOptimizationTestBase {
+
+ override def extractorExpression(
+ cteIndex: Int,
+ output: Seq[Attribute],
+ fieldIndex: Int): NamedExpression = {
+ GetStructField(
+ ScalarSubquery(
+ SparkShimLoader.shim
+ .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)),
+ fieldIndex,
+ None)
+ .as("scalarsubquery()")
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
new file mode 100644
index 000000000000..26677d85c71a
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonPushDownTest extends PaimonPushDownTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
new file mode 100644
index 000000000000..f37fbad27033
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
new file mode 100644
index 000000000000..6ab8a2671b51
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala
new file mode 100644
index 000000000000..412aa3b30351
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RewriteUpsertTableTest extends RewriteUpsertTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
new file mode 100644
index 000000000000..9f96840a7788
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala
new file mode 100644
index 000000000000..6601dc2fca37
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class ShowColumnsTest extends PaimonShowColumnsTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 000000000000..21c4c8a495ed
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala
new file mode 100644
index 000000000000..92309d54167b
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class TagDdlTest extends PaimonTagDdlTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000000..194aab278c0e
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
new file mode 100644
index 000000000000..aafd1dc4b967
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class VariantTest extends VariantTestBase {}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index 8a52273eeab2..246f6936537b 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -106,8 +106,8 @@ trait PaimonMergeIntoBase
dataEvolutionEnabled: Boolean): MergeAction = {
action match {
case d @ DeleteAction(_) => d
- case u @ UpdateAction(_, assignments) =>
- u.copy(assignments = alignAssignments(targetOutput, assignments))
+ case u: UpdateAction =>
+ u.copy(assignments = alignAssignments(targetOutput, u.assignments))
case i @ InsertAction(_, assignments) =>
i.copy(assignments = alignAssignments(targetOutput, assignments))
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala
index 78ee8ec2171c..04c996136cf1 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala
@@ -33,7 +33,9 @@ object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase {
// The condition must be from the target table
val resolvedCond = condition.map(resolveCondition(resolve, _, merge, TARGET_ONLY))
DeleteAction(resolvedCond)
- case UpdateAction(condition, assignments) =>
+ case u: UpdateAction =>
+ val condition = u.condition
+ val assignments = u.assignments
// The condition and value must be from the target table
val resolvedCond = condition.map(resolveCondition(resolve, _, merge, TARGET_ONLY))
val resolvedAssignments = resolveAssignments(resolve, assignments, merge, TARGET_ONLY)
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index 218fc9c0f3ef..aff4ba191f60 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -58,7 +58,9 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
// The condition can be from both target and source tables
val resolvedCond = condition.map(resolveCondition(resolve, _, merge, ALL))
DeleteAction(resolvedCond)
- case UpdateAction(condition, assignments) =>
+ case u: UpdateAction =>
+ val condition = u.condition
+ val assignments = u.assignments
// The condition and value can be from both target and source tables
val resolvedCond = condition.map(resolveCondition(resolve, _, merge, ALL))
val resolvedAssignments = resolveAssignments(resolve, assignments, merge, ALL)
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala
index c362ca67c792..0ba17e2006cb 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala
@@ -32,8 +32,10 @@ object PaimonRelation extends Logging {
def unapply(plan: LogicalPlan): Option[SparkTable] =
EliminateSubqueryAliases(plan) match {
- case Project(_, DataSourceV2Relation(table: SparkTable, _, _, _, _)) => Some(table)
- case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
+ case Project(_, d: DataSourceV2Relation) if d.table.isInstanceOf[SparkTable] =>
+ Some(d.table.asInstanceOf[SparkTable])
+ case d: DataSourceV2Relation if d.table.isInstanceOf[SparkTable] =>
+ Some(d.table.asInstanceOf[SparkTable])
case ResolvedTable(_, _, table: SparkTable, _) => Some(table)
case _ => None
}
@@ -50,8 +52,8 @@ object PaimonRelation extends Logging {
def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = {
EliminateSubqueryAliases(plan) match {
- case Project(_, d @ DataSourceV2Relation(_: SparkTable, _, _, _, _)) => d
- case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d
+ case Project(_, d: DataSourceV2Relation) if d.table.isInstanceOf[SparkTable] => d
+ case d: DataSourceV2Relation if d.table.isInstanceOf[SparkTable] => d
case _ => throw new RuntimeException(s"It's not a paimon table, $plan")
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index e2eaed8fe54f..6f6d7c0cee16 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Equ
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
@@ -316,16 +316,20 @@ case class MergeIntoPaimonDataEvolutionTable(
ua.copy(condition = newCond, assignments = newAssignments)
}
+ val shim = SparkShimLoader.shim
val mergeRows = MergeRows(
isSourceRowPresent = TrueLiteral,
isTargetRowPresent = TrueLiteral,
matchedInstructions = rewrittenUpdateActions
.map(
action => {
- Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value))
- }) ++ Seq(Keep(TrueLiteral, output)),
+ shim.createKeep(
+ "COPY",
+ action.condition.getOrElse(TrueLiteral),
+ action.assignments.map(a => a.value))
+ }) ++ Seq(shim.createKeep("COPY", TrueLiteral, output)),
notMatchedInstructions = Nil,
- notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)),
+ notMatchedBySourceInstructions = Seq(shim.createKeep("COPY", TrueLiteral, output)),
checkCardinality = false,
output = output,
child = readPlan
@@ -355,16 +359,20 @@ case class MergeIntoPaimonDataEvolutionTable(
Join(targetTableProj, sourceTableProj, LeftOuter, Some(matchedCondition), JoinHint.NONE)
val rowFromSourceAttr = attribute(ROW_FROM_SOURCE, joinPlan)
val rowFromTargetAttr = attribute(ROW_FROM_TARGET, joinPlan)
+ val shim = SparkShimLoader.shim
val mergeRows = MergeRows(
isSourceRowPresent = rowFromSourceAttr,
isTargetRowPresent = rowFromTargetAttr,
matchedInstructions = realUpdateActions
.map(
action => {
- Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value))
- }) ++ Seq(Keep(TrueLiteral, output)),
+ shim.createKeep(
+ "COPY",
+ action.condition.getOrElse(TrueLiteral),
+ action.assignments.map(a => a.value))
+ }) ++ Seq(shim.createKeep("COPY", TrueLiteral, output)),
notMatchedInstructions = Nil,
- notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)).toSeq,
+ notMatchedBySourceInstructions = Seq(shim.createKeep("COPY", TrueLiteral, output)).toSeq,
checkCardinality = false,
output = output,
child = joinPlan
@@ -393,13 +401,15 @@ case class MergeIntoPaimonDataEvolutionTable(
Join(sourceRelation, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE)
// merge rows as there are multiple not matched actions
+ val shim = SparkShimLoader.shim
val mergeRows = MergeRows(
isSourceRowPresent = TrueLiteral,
isTargetRowPresent = FalseLiteral,
matchedInstructions = Nil,
notMatchedInstructions = notMatchedActions.map {
case insertAction: InsertAction =>
- Keep(
+ shim.createKeep(
+ "COPY",
insertAction.condition.getOrElse(TrueLiteral),
insertAction.assignments.map(
a =>
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index d956a9472f11..f555c464e322 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -253,7 +253,8 @@ case class MergeIntoPaimonTable(
def processMergeActions(actions: Seq[MergeAction]): Seq[Seq[Expression]] = {
val columnExprs = actions.map {
- case UpdateAction(_, assignments) =>
+ case u: UpdateAction =>
+ val assignments = u.assignments
var exprs = assignments.map(_.value)
if (writeRowTracking) {
exprs ++= Seq(rowIdAttr, Literal(null))
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index ddbd9df5ac1b..84e7dfc01c0c 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -103,7 +103,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
plan.resolveOperatorsUp {
case u: UnresolvedWith =>
u.copy(cteRelations = u.cteRelations.map(
- t => (t._1, transformPaimonV1Function(t._2).asInstanceOf[SubqueryAlias])))
+ t => t.copy(_1 = t._1, _2 = transformPaimonV1Function(t._2).asInstanceOf[SubqueryAlias])))
case l: LogicalPlan =>
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
case u: UnresolvedFunction =>
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 2cb0101653af..94337124a13b 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution
import org.apache.paimon.utils.StringUtils
-import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
@@ -32,7 +31,7 @@ import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.execution.datasources.v2.text.{TextScanBuilder, TextTable}
-import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -50,71 +49,13 @@ object SparkFormatTable {
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
partitionSchema: StructType): PartitioningAwareFileIndex = {
-
- def globPaths: Boolean = {
- val entry = options.get(DataSource.GLOB_PATHS_KEY)
- Option(entry).forall(_ == "true")
- }
-
- val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
- // Hadoop Configurations are case-sensitive.
- val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
- if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
- // We are reading from the results of a streaming query. We will load files from
- // the metadata log instead of listing them using HDFS APIs.
- new PartitionedMetadataLogFileIndex(
- sparkSession,
- new Path(paths.head),
- options.asScala.toMap,
- userSpecifiedSchema,
- partitionSchema = partitionSchema)
- } else {
- // This is a non-streaming file based datasource.
- val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
- paths,
- hadoopConf,
- checkEmptyGlobPath = true,
- checkFilesExist = true,
- enableGlobbing = globPaths)
- val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-
- new PartitionedInMemoryFileIndex(
- sparkSession,
- rootPathsSpecified,
- caseSensitiveMap,
- userSpecifiedSchema,
- fileStatusCache,
- partitionSchema = partitionSchema)
- }
- }
-
- // Extend from MetadataLogFileIndex to override partitionSchema
- private class PartitionedMetadataLogFileIndex(
- sparkSession: SparkSession,
- path: Path,
- parameters: Map[String, String],
- userSpecifiedSchema: Option[StructType],
- override val partitionSchema: StructType)
- extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema)
-
- // Extend from InMemoryFileIndex to override partitionSchema
- private class PartitionedInMemoryFileIndex(
- sparkSession: SparkSession,
- rootPathsSpecified: Seq[Path],
- parameters: Map[String, String],
- userSpecifiedSchema: Option[StructType],
- fileStatusCache: FileStatusCache = NoopCache,
- userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
- metadataOpsTimeNs: Option[Long] = None,
- override val partitionSchema: StructType)
- extends InMemoryFileIndex(
+ SparkShimLoader.shim.createFileIndex(
+ options,
sparkSession,
- rootPathsSpecified,
- parameters,
+ paths,
userSpecifiedSchema,
- fileStatusCache,
- userSpecifiedPartitionSpec,
- metadataOpsTimeNs)
+ partitionSchema)
+ }
}
trait PartitionedFormatTable extends SupportsPartitionManagement {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 98296a400672..0dd32a615a52 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -28,11 +28,14 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.{Map => JMap}
@@ -88,6 +91,8 @@ trait SparkShim {
notMatchedBySourceActions: Seq[MergeAction],
withSchemaEvolution: Boolean): MergeIntoTable
+ def createKeep(context: String, condition: Expression, output: Seq[Expression]): Instruction
+
// for variant
def toPaimonVariant(o: Object): Variant
@@ -98,4 +103,11 @@ trait SparkShim {
def isSparkVariantType(dataType: org.apache.spark.sql.types.DataType): Boolean
def SparkVariantType(): org.apache.spark.sql.types.DataType
+
+ def createFileIndex(
+ options: CaseInsensitiveStringMap,
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ partitionSchema: StructType): PartitioningAwareFileIndex
}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/MemoryStreamWrapper.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/MemoryStreamWrapper.scala
new file mode 100644
index 000000000000..9e2566d93dc3
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/MemoryStreamWrapper.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SQLContext}
+import org.apache.spark.sql.execution.streaming.Offset
+
+import scala.util.Try
+
+/**
+ * A wrapper for MemoryStream to handle Spark version compatibility. In Spark 4.1+, MemoryStream was
+ * moved from `org.apache.spark.sql.execution.streaming` to
+ * `org.apache.spark.sql.execution.streaming.runtime`.
+ */
+class MemoryStreamWrapper[A] private (stream: AnyRef) {
+
+ private val streamClass = stream.getClass
+
+ def toDS(): Dataset[A] = {
+ streamClass.getMethod("toDS").invoke(stream).asInstanceOf[Dataset[A]]
+ }
+
+ def toDF(): DataFrame = {
+ streamClass.getMethod("toDF").invoke(stream).asInstanceOf[DataFrame]
+ }
+
+ def addData(data: A*): Offset = {
+ val method = streamClass.getMethod("addData", classOf[TraversableOnce[_]])
+ method.invoke(stream, data).asInstanceOf[Offset]
+ }
+}
+
+object MemoryStreamWrapper {
+
+ /** Creates a MemoryStream wrapper that works across different Spark versions. */
+ def apply[A](implicit encoder: Encoder[A], sqlContext: SQLContext): MemoryStreamWrapper[A] = {
+ val stream = createMemoryStream[A]
+ new MemoryStreamWrapper[A](stream)
+ }
+
+ private def createMemoryStream[A](implicit
+ encoder: Encoder[A],
+ sqlContext: SQLContext): AnyRef = {
+ // Try Spark 4.1+ path first (runtime package)
+ val spark41Class = Try(
+ Class.forName("org.apache.spark.sql.execution.streaming.runtime.MemoryStream$"))
+ if (spark41Class.isSuccess) {
+ val companion = spark41Class.get.getField("MODULE$").get(null)
+ // Spark 4.1+ uses implicit SparkSession instead of SQLContext
+ val applyMethod = companion.getClass.getMethod(
+ "apply",
+ classOf[Encoder[_]],
+ classOf[org.apache.spark.sql.SparkSession]
+ )
+ return applyMethod.invoke(companion, encoder, sqlContext.sparkSession).asInstanceOf[AnyRef]
+ }
+
+ // Fallback to Spark 3.x / 4.0 path
+ val oldClass =
+ Class.forName("org.apache.spark.sql.execution.streaming.MemoryStream$")
+ val companion = oldClass.getField("MODULE$").get(null)
+ val applyMethod = companion.getClass.getMethod(
+ "apply",
+ classOf[Encoder[_]],
+ classOf[SQLContext]
+ )
+ applyMethod.invoke(companion, encoder, sqlContext).asInstanceOf[AnyRef]
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
index e103429559ba..6300600a820b 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
@@ -18,8 +18,9 @@
package org.apache.paimon.spark
+import org.apache.paimon.spark.MemoryStreamWrapper
+
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
@@ -150,7 +151,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
val location = table.location().toString
// streaming write
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val writeStream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index c43170d7ba1b..3c92b7eed9d3 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -19,10 +19,10 @@
package org.apache.paimon.spark
import org.apache.paimon.Snapshot.CommitKind._
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
import org.apache.spark.sql.streaming.StreamTest
@@ -47,7 +47,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -91,7 +91,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -131,7 +131,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData.toDS
.toDF("uid", "city")
.groupBy("city")
@@ -175,7 +175,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
intercept[RuntimeException] {
inputData
.toDF()
@@ -199,7 +199,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Long, Int, Double)]
+ val inputData = MemoryStreamWrapper[(Long, Int, Double)]
val data = inputData.toDS
.toDF("time", "stockId", "price")
.selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price")
@@ -256,7 +256,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, "2023-08-09") :: Row(2, "2023-08-09") :: Nil)
- val inputData = MemoryStream[(Long, Date, Int)]
+ val inputData = MemoryStreamWrapper[(Long, Date, Int)]
val stream = inputData
.toDS()
.toDF("a", "b", "c")
@@ -325,7 +325,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
val table = loadTable("T")
val location = table.location().toString
- val inputData = MemoryStream[(Int, Int)]
+ val inputData = MemoryStreamWrapper[(Int, Int)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala
index 316c36c40c56..59b5c8fd1cb5 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala
@@ -18,10 +18,10 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest {
@@ -37,7 +37,7 @@ class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
index 67786a47fe3f..4b866875eceb 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
@@ -18,10 +18,10 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class BranchProcedureTest extends PaimonSparkTestBase with StreamTest {
@@ -38,7 +38,7 @@ class BranchProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index e89eba2e8599..825f12b997cd 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure
import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.utils.SparkProcedureUtils
import org.apache.paimon.table.FileStoreTable
@@ -27,7 +28,6 @@ import org.apache.paimon.table.source.DataSplit
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageSubmitted}
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
import org.assertj.core.api.Assertions
import org.scalatest.time.Span
@@ -102,7 +102,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, Int)]
+ val inputData = MemoryStreamWrapper[(Int, Int)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -198,7 +198,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, Int, Int)]
+ val inputData = MemoryStreamWrapper[(Int, Int, Int)]
val stream = inputData
.toDS()
.toDF("p", "a", "b")
@@ -368,7 +368,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, Int)]
+ val inputData = MemoryStreamWrapper[(Int, Int)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -822,7 +822,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b", "c")
@@ -970,7 +970,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, Int, String, Int)]
+ val inputData = MemoryStreamWrapper[(Int, Int, String, Int)]
val stream = inputData
.toDS()
.toDF("a", "b", "c", "pt")
@@ -1184,7 +1184,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b", "c")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 4a4c7ae215df..bcb53faf957b 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -18,10 +18,10 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTest {
@@ -39,7 +39,7 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -146,7 +146,7 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
index e9b00298e492..2bc8fdbb3101 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
@@ -18,11 +18,11 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.utils.SnapshotNotExistException
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with StreamTest {
@@ -39,7 +39,7 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -116,7 +116,7 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 586f2e6c2d72..1d2bd0981e72 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -18,10 +18,10 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
import org.assertj.core.api.Assertions.assertThatThrownBy
@@ -41,7 +41,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
@@ -93,7 +93,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String, String)]
+ val inputData = MemoryStreamWrapper[(String, String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt", "hm")
@@ -162,7 +162,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
@@ -218,7 +218,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
@@ -286,7 +286,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String, String)]
+ val inputData = MemoryStreamWrapper[(String, String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt", "hm")
@@ -352,7 +352,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
@@ -417,7 +417,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String, String)]
+ val inputData = MemoryStreamWrapper[(String, String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt", "hm")
@@ -487,7 +487,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String, String)]
+ val inputData = MemoryStreamWrapper[(String, String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt", "hm")
@@ -565,7 +565,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
@@ -634,7 +634,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
@@ -701,7 +701,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(String, String)]
+ val inputData = MemoryStreamWrapper[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index aa65d8b9c38e..f1e3f2f14859 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -18,11 +18,11 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.utils.SnapshotManager
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
import org.assertj.core.api.Assertions.{assertThat, assertThatIllegalArgumentException}
@@ -44,7 +44,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -100,7 +100,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -175,7 +175,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -230,7 +230,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index 66f2d57e02bc..9fc0182b5dee 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -18,10 +18,10 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.spark.MemoryStreamWrapper
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
@@ -40,7 +40,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
val table = loadTable("T")
val location = table.location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
@@ -169,7 +169,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
|""".stripMargin)
val location = loadTable("T").location().toString
- val inputData = MemoryStream[(Int, String)]
+ val inputData = MemoryStreamWrapper[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
index 02a5b9a83015..2b147dbf93fa 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
@@ -49,7 +49,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp
val node1 = nodes(0)
assert(
node1.isInstanceOf[AppendDataExec] &&
- node1.toString.contains("PaimonWrite(table=test.t1"),
+ node1.asInstanceOf[AppendDataExec].write.toString.contains("PaimonWrite(table=test.t1"),
s"Expected AppendDataExec with specific paimon write, but got: $node1"
)
@@ -92,7 +92,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp
val node1 = nodes(0)
assert(
node1.isInstanceOf[AppendDataExec] &&
- node1.toString.contains("PaimonWrite(table=test.t1"),
+ node1.asInstanceOf[AppendDataExec].write.toString.contains("PaimonWrite(table=test.t1"),
s"Expected AppendDataExec with specific paimon write, but got: $node1"
)
diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 70011e14c3c2..202974fd2e41 100644
--- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -24,20 +24,27 @@ import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensi
import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow}
import org.apache.paimon.types.{DataType, RowType}
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.{Map => JMap}
+import scala.collection.JavaConverters._
+
class Spark3Shim extends SparkShim {
override def classicApi: ClassicApi = new Classic3Api
@@ -108,6 +115,13 @@ class Spark3Shim extends SparkShim {
notMatchedBySourceActions)
}
+ override def createKeep(
+ context: String,
+ condition: Expression,
+ output: Seq[Expression]): Instruction = {
+ MergeRows.Keep(condition, output)
+ }
+
override def toPaimonVariant(o: Object): Variant = throw new UnsupportedOperationException()
override def isSparkVariantType(dataType: org.apache.spark.sql.types.DataType): Boolean = false
@@ -120,4 +134,70 @@ class Spark3Shim extends SparkShim {
override def toPaimonVariant(array: ArrayData, pos: Int): Variant =
throw new UnsupportedOperationException()
+
+ def createFileIndex(
+ options: CaseInsensitiveStringMap,
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ partitionSchema: StructType): PartitioningAwareFileIndex = {
+
+ class PartitionedMetadataLogFileIndex(
+ sparkSession: SparkSession,
+ path: Path,
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ override val partitionSchema: StructType)
+ extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema)
+
+ class PartitionedInMemoryFileIndex(
+ sparkSession: SparkSession,
+ rootPathsSpecified: Seq[Path],
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache,
+ userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+ metadataOpsTimeNs: Option[Long] = None,
+ override val partitionSchema: StructType)
+ extends InMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ parameters,
+ userSpecifiedSchema,
+ fileStatusCache,
+ userSpecifiedPartitionSpec,
+ metadataOpsTimeNs)
+
+ def globPaths: Boolean = {
+ val entry = options.get(DataSource.GLOB_PATHS_KEY)
+ Option(entry).forall(_ == "true")
+ }
+
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
+ new PartitionedMetadataLogFileIndex(
+ sparkSession,
+ new Path(paths.head),
+ options.asScala.toMap,
+ userSpecifiedSchema,
+ partitionSchema = partitionSchema)
+ } else {
+ val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+ paths,
+ hadoopConf,
+ checkEmptyGlobPath = true,
+ checkFilesExist = true,
+ enableGlobbing = globPaths)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+ new PartitionedInMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ caseSensitiveMap,
+ userSpecifiedSchema,
+ fileStatusCache,
+ partitionSchema = partitionSchema)
+ }
+ }
}
diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala
index d8ba2847ab88..048a2c0c6e43 100644
--- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala
+++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.data
import org.apache.paimon.types.DataType
-import org.apache.spark.unsafe.types.VariantVal
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, VariantVal}
class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkArrayData {
@@ -28,4 +28,12 @@ class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkA
val v = paimonArray.getVariant(ordinal)
new VariantVal(v.value(), v.metadata())
}
+
+ def getGeography(ordinal: Int): GeographyVal = {
+ throw new UnsupportedOperationException("GeographyVal is not supported")
+ }
+
+ def getGeometry(ordinal: Int): GeometryVal = {
+ throw new UnsupportedOperationException("GeometryVal is not supported")
+ }
}
diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala
index 9ac2766346f9..0447b26a3273 100644
--- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala
+++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.data
import org.apache.paimon.spark.AbstractSparkInternalRow
import org.apache.paimon.types.RowType
-import org.apache.spark.unsafe.types.VariantVal
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, VariantVal}
class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowType) {
@@ -29,4 +29,12 @@ class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowTy
val v = row.getVariant(i)
new VariantVal(v.value(), v.metadata())
}
+
+ def getGeography(ordinal: Int): GeographyVal = {
+ throw new UnsupportedOperationException("GeographyVal is not supported")
+ }
+
+ def getGeometry(ordinal: Int): GeometryVal = {
+ throw new UnsupportedOperationException("GeometryVal is not supported")
+ }
}
diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
index 0a208daea292..c52207e43197 100644
--- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
+++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
@@ -18,12 +18,10 @@
package org.apache.paimon.spark.data
-import org.apache.paimon.spark.AbstractSparkInternalRow
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.InternalRowUtils.copyInternalRow
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.unsafe.types.VariantVal
class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int, blobAsDescriptor: Boolean)
extends Spark4InternalRow(rowType) {
diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
new file mode 100644
index 000000000000..ba2d790c7a8a
--- /dev/null
+++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.paimon.spark.catalyst.analysis.{PaimonDeleteTable, PaimonMergeInto, PaimonUpdateTable}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateTable}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
+
+/**
+ * A rule that resolves assignments in row-level commands.
+ *
+ * Note that this rule must be run before rewriting row-level commands into executable plans. This
+ * rule does not apply to tables that accept any schema. Such tables must inject their own rules to
+ * resolve assignments.
+ */
+object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ val plan1 = applyInt(plan)
+ if (org.apache.spark.SPARK_VERSION >= "4.1") {
+ val plan2 = PaimonUpdateTable.apply(plan1)
+ val plan3 = PaimonDeleteTable.apply(plan2)
+ PaimonMergeInto(null).apply(plan3)
+ } else {
+ plan1
+ }
+ }
+
+ private def applyInt(plan: LogicalPlan): LogicalPlan =
+ plan.resolveOperatorsWithPruning(_.containsPattern(COMMAND), ruleId) {
+ case u: UpdateTable if !u.skipSchemaResolution && u.resolved && u.rewritable && !u.aligned =>
+ validateStoreAssignmentPolicy()
+ val newTable = cleanAttrMetadata(u.table)
+ val newAssignments = AssignmentUtils.alignUpdateAssignments(
+ u.table.output,
+ u.assignments,
+ fromStar = false,
+ coerceNestedTypes = false)
+ u.copy(table = newTable, assignments = newAssignments)
+
+ case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned =>
+ resolveAssignments(u)
+
+ case m: MergeIntoTable
+ if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned &&
+ !m.needSchemaEvolution =>
+ validateStoreAssignmentPolicy()
+ val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes && m.withSchemaEvolution
+ m.copy(
+ targetTable = cleanAttrMetadata(m.targetTable),
+ matchedActions = alignActions(m.targetTable.output, m.matchedActions, coerceNestedTypes),
+ notMatchedActions =
+ alignActions(m.targetTable.output, m.notMatchedActions, coerceNestedTypes),
+ notMatchedBySourceActions =
+ alignActions(m.targetTable.output, m.notMatchedBySourceActions, coerceNestedTypes)
+ )
+
+ case m: MergeIntoTable
+ if !m.skipSchemaResolution && m.resolved && !m.aligned
+ && !m.needSchemaEvolution =>
+ resolveAssignments(m)
+ }
+
+ private def validateStoreAssignmentPolicy(): Unit = {
+ // SPARK-28730: LEGACY store assignment policy is disallowed in data source v2
+ if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
+ throw QueryCompilationErrors.legacyStoreAssignmentPolicyError()
+ }
+ }
+
+ private def cleanAttrMetadata(table: LogicalPlan): LogicalPlan = {
+ table.transform {
+ case r: DataSourceV2Relation =>
+ r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata))
+ }
+ }
+
+ private def resolveAssignments(p: LogicalPlan): LogicalPlan = {
+ p.transformExpressions {
+ case assignment: Assignment =>
+ val nullHandled = if (!assignment.key.nullable && assignment.value.nullable) {
+ AssertNotNull(assignment.value)
+ } else {
+ assignment.value
+ }
+ val casted = if (assignment.key.dataType != nullHandled.dataType) {
+ val cast = Cast(nullHandled, assignment.key.dataType, ansiEnabled = true)
+ cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
+ cast
+ } else {
+ nullHandled
+ }
+ val rawKeyType = assignment.key.transform {
+ case a: AttributeReference =>
+ CharVarcharUtils.getRawType(a.metadata).map(a.withDataType).getOrElse(a)
+ }.dataType
+ val finalValue = if (CharVarcharUtils.hasCharVarchar(rawKeyType)) {
+ CharVarcharUtils.stringLengthCheck(casted, rawKeyType)
+ } else {
+ casted
+ }
+ val cleanedKey = assignment.key.transform {
+ case a: AttributeReference => CharVarcharUtils.cleanAttrMetadata(a)
+ }
+ Assignment(cleanedKey, finalValue)
+ }
+ }
+
+ private def alignActions(
+ attrs: Seq[Attribute],
+ actions: Seq[MergeAction],
+ coerceNestedTypes: Boolean): Seq[MergeAction] = {
+ actions.map {
+ case u @ UpdateAction(_, assignments, fromStar) =>
+ u.copy(assignments =
+ AssignmentUtils.alignUpdateAssignments(attrs, assignments, fromStar, coerceNestedTypes))
+ case d: DeleteAction =>
+ d
+ case i @ InsertAction(_, assignments) =>
+ i.copy(assignments =
+ AssignmentUtils.alignInsertAssignments(attrs, assignments, coerceNestedTypes))
+ case other =>
+ throw new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3052",
+ messageParameters = Map("other" -> other.toString))
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala
new file mode 100644
index 000000000000..714abd0a6aa9
--- /dev/null
+++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon.shims
+
+import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.types.{DataType, RowType}
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
+import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+object MinorVersionShim {
+
+ def createKeep(context: String, condition: Expression, output: Seq[Expression]): Instruction = {
+ val ctx = context match {
+ case "COPY" => MergeRows.Copy
+ case "DELETE" => MergeRows.Delete
+ case "INSERT" => MergeRows.Insert
+ case "UPDATE" => MergeRows.Update
+ case _ => MergeRows.Copy
+ }
+
+ MergeRows.Keep(ctx, condition, output)
+ }
+
+ def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
+ new Spark4InternalRow(rowType)
+ }
+
+ def createSparkInternalRowWithBlob(
+ rowType: RowType,
+ blobFieldIndex: Int,
+ blobAsDescriptor: Boolean): SparkInternalRow = {
+ new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+ }
+
+ def createSparkArrayData(elementType: DataType): SparkArrayData = {
+ new Spark4ArrayData(elementType)
+ }
+
+ def createFileIndex(
+ options: CaseInsensitiveStringMap,
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ partitionSchema: StructType): PartitioningAwareFileIndex = {
+
+ class PartitionedMetadataLogFileIndex(
+ sparkSession: SparkSession,
+ path: Path,
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ override val partitionSchema: StructType)
+ extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema)
+
+ class PartitionedInMemoryFileIndex(
+ sparkSession: SparkSession,
+ rootPathsSpecified: Seq[Path],
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache,
+ userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+ metadataOpsTimeNs: Option[Long] = None,
+ override val partitionSchema: StructType)
+ extends InMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ parameters,
+ userSpecifiedSchema,
+ fileStatusCache,
+ userSpecifiedPartitionSpec,
+ metadataOpsTimeNs)
+
+ def globPaths: Boolean = {
+ val entry = options.get(DataSource.GLOB_PATHS_KEY)
+ Option(entry).forall(_ == "true")
+ }
+
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
+ new PartitionedMetadataLogFileIndex(
+ sparkSession,
+ new Path(paths.head),
+ options.asScala.toMap,
+ userSpecifiedSchema,
+ partitionSchema = partitionSchema)
+ } else {
+ val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+ paths,
+ hadoopConf,
+ checkEmptyGlobPath = true,
+ checkFilesExist = true,
+ enableGlobbing = globPaths)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+ new PartitionedInMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ caseSensitiveMap,
+ userSpecifiedSchema,
+ fileStatusCache,
+ partitionSchema = partitionSchema)
+ }
+ }
+
+}
diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index ad36acfb26a9..f4d86bf759ed 100644
--- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
import org.apache.paimon.data.variant.{GenericVariant, Variant}
import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
-import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.SparkSession
@@ -30,11 +30,14 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{DataTypes, StructType, VariantType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.VariantVal
import java.util.{Map => JMap}
@@ -52,18 +55,18 @@ class Spark4Shim extends SparkShim {
}
override def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
- new Spark4InternalRow(rowType)
+ MinorVersionShim.createSparkInternalRow(rowType)
}
override def createSparkInternalRowWithBlob(
rowType: RowType,
blobFieldIndex: Int,
blobAsDescriptor: Boolean): SparkInternalRow = {
- new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+ MinorVersionShim.createSparkInternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
}
override def createSparkArrayData(elementType: DataType): SparkArrayData = {
- new Spark4ArrayData(elementType)
+ MinorVersionShim.createSparkArrayData(elementType)
}
override def createTable(
@@ -113,6 +116,13 @@ class Spark4Shim extends SparkShim {
withSchemaEvolution)
}
+ override def createKeep(
+ context: String,
+ condition: Expression,
+ output: Seq[Expression]): Instruction = {
+ MinorVersionShim.createKeep(context, condition, output)
+ }
+
override def toPaimonVariant(o: Object): Variant = {
val v = o.asInstanceOf[VariantVal]
new GenericVariant(v.getValue, v.getMetadata)
@@ -132,4 +142,18 @@ class Spark4Shim extends SparkShim {
dataType.isInstanceOf[VariantType]
override def SparkVariantType(): org.apache.spark.sql.types.DataType = DataTypes.VariantType
+
+ override def createFileIndex(
+ options: CaseInsensitiveStringMap,
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ partitionSchema: StructType): PartitioningAwareFileIndex = {
+ MinorVersionShim.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ userSpecifiedSchema,
+ partitionSchema)
+ }
}
diff --git a/pom.xml b/pom.xml
index 600a65c3c08a..9270bc5a26bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@ under the License.
1.20.1
2.12
2.12.18
- 2.13.16
+ 2.13.17
${scala212.version}
${scala212.version}
1.1.8.4
@@ -425,17 +425,18 @@ under the License.
paimon-spark/paimon-spark4-common
paimon-spark/paimon-spark-4.0
+ paimon-spark/paimon-spark-4.1
17
4.13.1
2.13
${scala213.version}
- 4.0.1
+ 4.1.0
paimon-spark4-common_2.13
18.1.0
- 4.0
- 4.0.1
+ 4.1
+ 4.1.0