diff --git a/.github/workflows/utitcase-spark-4.x.yml b/.github/workflows/utitcase-spark-4.x.yml index 5b877803068b..024dcc67ebb0 100644 --- a/.github/workflows/utitcase-spark-4.x.yml +++ b/.github/workflows/utitcase-spark-4.x.yml @@ -61,10 +61,10 @@ jobs: jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" test_modules="" - for suffix in ut 4.0; do + for suffix in ut 4.1 4.0; do test_modules+="org.apache.paimon:paimon-spark-${suffix}_2.13," done test_modules="${test_modules%,}" mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1 env: - MAVEN_OPTS: -Xmx4096m \ No newline at end of file + MAVEN_OPTS: -Xmx4096m diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala new file mode 100644 index 000000000000..d8ba2847ab88 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.types.DataType + +import org.apache.spark.unsafe.types.VariantVal + +class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkArrayData { + + override def getVariant(ordinal: Int): VariantVal = { + val v = paimonArray.getVariant(ordinal) + new VariantVal(v.value(), v.metadata()) + } +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala new file mode 100644 index 000000000000..9ac2766346f9 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.spark.AbstractSparkInternalRow +import org.apache.paimon.types.RowType + +import org.apache.spark.unsafe.types.VariantVal + +class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowType) { + + override def getVariant(i: Int): VariantVal = { + val v = row.getVariant(i) + new VariantVal(v.value(), v.metadata()) + } +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala new file mode 100644 index 000000000000..c52207e43197 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.types.RowType +import org.apache.paimon.utils.InternalRowUtils.copyInternalRow + +import org.apache.spark.sql.catalyst.InternalRow + +class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int, blobAsDescriptor: Boolean) + extends Spark4InternalRow(rowType) { + + override def getBinary(ordinal: Int): Array[Byte] = { + if (ordinal == blobFieldIndex) { + if (blobAsDescriptor) { + row.getBlob(ordinal).toDescriptor.serialize() + } else { + row.getBlob(ordinal).toData + } + } else { + super.getBinary(ordinal) + } + } + + override def copy: InternalRow = + SparkInternalRow.create(rowType, blobAsDescriptor).replace(copyInternalRow(row, rowType)) +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala new file mode 100644 index 000000000000..2c3ad0c659ec --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow} +import org.apache.paimon.types.{DataType, RowType} + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.MergeRows +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ + +object MinorVersionShim { + + def createKeep(context: String, condition: Expression, output: Seq[Expression]): Instruction = { + MergeRows.Keep(condition, output) + } + + def createSparkInternalRow(rowType: RowType): SparkInternalRow = { + new Spark4InternalRow(rowType) + } + + def createSparkInternalRowWithBlob( + rowType: RowType, + blobFieldIndex: Int, + blobAsDescriptor: Boolean): SparkInternalRow = { + new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor) + } + + def createSparkArrayData(elementType: DataType): SparkArrayData = { + new Spark4ArrayData(elementType) + } + + def createFileIndex( + options: CaseInsensitiveStringMap, + sparkSession: SparkSession, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + partitionSchema: StructType): PartitioningAwareFileIndex = { + + class PartitionedMetadataLogFileIndex( + sparkSession: SparkSession, + path: Path, + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + override val partitionSchema: StructType) + extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) + + class PartitionedInMemoryFileIndex( + sparkSession: SparkSession, + rootPathsSpecified: Seq[Path], + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache, + userSpecifiedPartitionSpec: Option[PartitionSpec] = None, + metadataOpsTimeNs: Option[Long] = None, + override val partitionSchema: StructType) + extends InMemoryFileIndex( + sparkSession, + rootPathsSpecified, + parameters, + userSpecifiedSchema, + fileStatusCache, + userSpecifiedPartitionSpec, + metadataOpsTimeNs) + + def globPaths: Boolean = { + val entry = options.get(DataSource.GLOB_PATHS_KEY) + Option(entry).forall(_ == "true") + } + + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { + new PartitionedMetadataLogFileIndex( + sparkSession, + new Path(paths.head), + options.asScala.toMap, + userSpecifiedSchema, + partitionSchema = partitionSchema) + } else { + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary( + paths, + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = globPaths) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + + new PartitionedInMemoryFileIndex( + sparkSession, + rootPathsSpecified, + caseSensitiveMap, + userSpecifiedSchema, + fileStatusCache, + partitionSchema = partitionSchema) + } + } + +} diff --git a/paimon-spark/paimon-spark-4.1/pom.xml b/paimon-spark/paimon-spark-4.1/pom.xml new file mode 100644 index 000000000000..21e7143463cd --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/pom.xml @@ -0,0 +1,156 @@ + + + + 4.0.0 + + + org.apache.paimon + paimon-spark + 1.4-SNAPSHOT + + + paimon-spark-4.1_2.13 + Paimon : Spark : 4.1 : 2.13 + + + 4.1.0 + + + + + org.apache.paimon + paimon-format + + + + org.apache.paimon + paimon-spark4-common_${scala.binary.version} + ${project.version} + + + + org.apache.paimon + paimon-spark-common_${scala.binary.version} + ${project.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + + + + + + org.apache.paimon + paimon-spark-ut_${scala.binary.version} + ${project.version} + tests + test + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + tests + test + + + org.apache.spark + spark-connect-shims_${scala.binary.version} + + + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + tests + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + * + + com/github/luben/zstd/** + **/*libzstd-jni-*.so + **/*libzstd-jni-*.dll + + + + + + org.apache.paimon:paimon-spark4-common_${scala.binary.version} + + + + + + + + + + diff --git a/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala new file mode 100644 index 000000000000..e86195f1af0b --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.optimizer + +import org.apache.paimon.spark.PaimonScan + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, ExprId, ScalarSubquery, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation + +object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase { + + override def tryMergeDataSourceV2ScanRelation( + newV2ScanRelation: DataSourceV2ScanRelation, + cachedV2ScanRelation: DataSourceV2ScanRelation) + : Option[(LogicalPlan, AttributeMap[Attribute])] = { + (newV2ScanRelation, cachedV2ScanRelation) match { + case ( + DataSourceV2ScanRelation( + newRelation, + newScan: PaimonScan, + newOutput, + newPartitioning, + newOrdering), + DataSourceV2ScanRelation( + cachedRelation, + cachedScan: PaimonScan, + _, + cachedPartitioning, + cacheOrdering)) => + checkIdenticalPlans(newRelation, cachedRelation).flatMap { + outputMap => + if ( + samePartitioning(newPartitioning, cachedPartitioning, outputMap) && sameOrdering( + newOrdering, + cacheOrdering, + outputMap) + ) { + mergePaimonScan(newScan, cachedScan).map { + mergedScan => + val mergedAttributes = mergedScan + .readSchema() + .map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) + val cachedOutputNameMap = cachedRelation.output.map(a => a.name -> a).toMap + val mergedOutput = + mergedAttributes.map(a => cachedOutputNameMap.getOrElse(a.name, a)) + val newV2ScanRelation = + cachedV2ScanRelation.copy(scan = mergedScan, output = mergedOutput) + + val mergedOutputNameMap = mergedOutput.map(a => a.name -> a).toMap + val newOutputMap = + AttributeMap(newOutput.map(a => a -> mergedOutputNameMap(a.name).toAttribute)) + + newV2ScanRelation -> newOutputMap + } + } else { + None + } + } + + case _ => None + } + } + + private def sameOrdering( + newOrdering: Option[Seq[SortOrder]], + cachedOrdering: Option[Seq[SortOrder]], + outputAttrMap: AttributeMap[Attribute]): Boolean = { + val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputAttrMap))) + mappedNewOrdering.map(_.map(_.canonicalized)) == cachedOrdering.map(_.map(_.canonicalized)) + } + + override protected def createScalarSubquery(plan: LogicalPlan, exprId: ExprId): ScalarSubquery = { + ScalarSubquery(plan, exprId = exprId) + } +} diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar b/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar new file mode 100644 index 000000000000..a5bfa456f668 Binary files /dev/null and b/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar differ diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml new file mode 100644 index 000000000000..bdf2bb090760 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml @@ -0,0 +1,56 @@ + + + + + hive.metastore.integral.jdo.pushdown + true + + + + hive.metastore.schema.verification + false + + + + hive.metastore.client.capability.check + false + + + + datanucleus.schema.autoCreateTables + true + + + + datanucleus.schema.autoCreateAll + true + + + + + datanucleus.connectionPoolingType + DBCP + + + + hive.metastore.uris + thrift://localhost:9090 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + \ No newline at end of file diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties b/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000000..6f324f5863ac --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties @@ -0,0 +1,38 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n + +logger.kafka.name = kafka +logger.kafka.level = OFF +logger.kafka2.name = state.change +logger.kafka2.level = OFF + +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = OFF +logger.I0Itec.name = org.I0Itec +logger.I0Itec.level = OFF diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala new file mode 100644 index 000000000000..322d50a62127 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +class CompactProcedureTest extends CompactProcedureTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala new file mode 100644 index 000000000000..d57846709877 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +class ProcedureTest extends ProcedureTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala new file mode 100644 index 000000000000..255906d04bf2 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class AnalyzeTableTest extends AnalyzeTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala new file mode 100644 index 000000000000..b729f57b33e7 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DDLTest extends DDLTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala new file mode 100644 index 000000000000..cb139d2a57be --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase {} + +class DefaultDatabaseTest extends DefaultDatabaseTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala new file mode 100644 index 000000000000..6170e2fd6c5c --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DataFrameWriteTest extends DataFrameWriteTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala new file mode 100644 index 000000000000..ab33a40e5966 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.spark.SparkConf + +class DeleteFromTableTest extends DeleteFromTableTestBase {} + +class V2DeleteFromTableTest extends DeleteFromTableTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala new file mode 100644 index 000000000000..c6aa77419241 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DescribeTableTest extends DescribeTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala new file mode 100644 index 000000000000..ba49976ab6c0 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class FormatTableTest extends FormatTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala new file mode 100644 index 000000000000..4f66584c303b --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class InsertOverwriteTableTest extends InsertOverwriteTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala new file mode 100644 index 000000000000..b9a85b147eea --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest} + +class MergeIntoPrimaryKeyBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoPrimaryKeyTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyBucketedTableTest {} + +class MergeIntoPrimaryKeyNonBucketTableTest + extends MergeIntoTableTestBase + with MergeIntoPrimaryKeyTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyNonBucketTableTest {} + +class MergeIntoAppendBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoAppendTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonAppendBucketedTableTest {} + +class MergeIntoAppendNonBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoAppendTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonAppendNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala new file mode 100644 index 000000000000..635185a9ed0e --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PaimonCompositePartitionKeyTest extends PaimonCompositePartitionKeyTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala new file mode 100644 index 000000000000..ec140a89bbd3 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, NamedExpression, ScalarSubquery} +import org.apache.spark.sql.paimon.shims.SparkShimLoader + +class PaimonOptimizationTest extends PaimonOptimizationTestBase { + + override def extractorExpression( + cteIndex: Int, + output: Seq[Attribute], + fieldIndex: Int): NamedExpression = { + GetStructField( + ScalarSubquery( + SparkShimLoader.shim + .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)), + fieldIndex, + None) + .as("scalarsubquery()") + } +} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala new file mode 100644 index 000000000000..26677d85c71a --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PaimonPushDownTest extends PaimonPushDownTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala new file mode 100644 index 000000000000..f37fbad27033 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala new file mode 100644 index 000000000000..6ab8a2671b51 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PaimonViewTest extends PaimonViewTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala new file mode 100644 index 000000000000..412aa3b30351 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class RewriteUpsertTableTest extends RewriteUpsertTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala new file mode 100644 index 000000000000..9f96840a7788 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class RowTrackingTest extends RowTrackingTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala new file mode 100644 index 000000000000..6601dc2fca37 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class ShowColumnsTest extends PaimonShowColumnsTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala new file mode 100644 index 000000000000..21c4c8a495ed --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala new file mode 100644 index 000000000000..92309d54167b --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class TagDdlTest extends PaimonTagDdlTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala new file mode 100644 index 000000000000..194aab278c0e --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class UpdateTableTest extends UpdateTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala new file mode 100644 index 000000000000..aafd1dc4b967 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class VariantTest extends VariantTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index 8a52273eeab2..246f6936537b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -106,8 +106,8 @@ trait PaimonMergeIntoBase dataEvolutionEnabled: Boolean): MergeAction = { action match { case d @ DeleteAction(_) => d - case u @ UpdateAction(_, assignments) => - u.copy(assignments = alignAssignments(targetOutput, assignments)) + case u: UpdateAction => + u.copy(assignments = alignAssignments(targetOutput, u.assignments)) case i @ InsertAction(_, assignments) => i.copy(assignments = alignAssignments(targetOutput, assignments)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala index 78ee8ec2171c..04c996136cf1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala @@ -33,7 +33,9 @@ object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase { // The condition must be from the target table val resolvedCond = condition.map(resolveCondition(resolve, _, merge, TARGET_ONLY)) DeleteAction(resolvedCond) - case UpdateAction(condition, assignments) => + case u: UpdateAction => + val condition = u.condition + val assignments = u.assignments // The condition and value must be from the target table val resolvedCond = condition.map(resolveCondition(resolve, _, merge, TARGET_ONLY)) val resolvedAssignments = resolveAssignments(resolve, assignments, merge, TARGET_ONLY) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala index 218fc9c0f3ef..aff4ba191f60 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala @@ -58,7 +58,9 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper { // The condition can be from both target and source tables val resolvedCond = condition.map(resolveCondition(resolve, _, merge, ALL)) DeleteAction(resolvedCond) - case UpdateAction(condition, assignments) => + case u: UpdateAction => + val condition = u.condition + val assignments = u.assignments // The condition and value can be from both target and source tables val resolvedCond = condition.map(resolveCondition(resolve, _, merge, ALL)) val resolvedAssignments = resolveAssignments(resolve, assignments, merge, ALL) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala index c362ca67c792..0ba17e2006cb 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala @@ -32,8 +32,10 @@ object PaimonRelation extends Logging { def unapply(plan: LogicalPlan): Option[SparkTable] = EliminateSubqueryAliases(plan) match { - case Project(_, DataSourceV2Relation(table: SparkTable, _, _, _, _)) => Some(table) - case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table) + case Project(_, d: DataSourceV2Relation) if d.table.isInstanceOf[SparkTable] => + Some(d.table.asInstanceOf[SparkTable]) + case d: DataSourceV2Relation if d.table.isInstanceOf[SparkTable] => + Some(d.table.asInstanceOf[SparkTable]) case ResolvedTable(_, _, table: SparkTable, _) => Some(table) case _ => None } @@ -50,8 +52,8 @@ object PaimonRelation extends Logging { def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = { EliminateSubqueryAliases(plan) match { - case Project(_, d @ DataSourceV2Relation(_: SparkTable, _, _, _, _)) => d - case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d + case Project(_, d: DataSourceV2Relation) if d.table.isInstanceOf[SparkTable] => d + case d: DataSourceV2Relation if d.table.isInstanceOf[SparkTable] => d case _ => throw new RuntimeException(s"It's not a paimon table, $plan") } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index e2eaed8fe54f..6f6d7c0cee16 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Equ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ @@ -316,16 +316,20 @@ case class MergeIntoPaimonDataEvolutionTable( ua.copy(condition = newCond, assignments = newAssignments) } + val shim = SparkShimLoader.shim val mergeRows = MergeRows( isSourceRowPresent = TrueLiteral, isTargetRowPresent = TrueLiteral, matchedInstructions = rewrittenUpdateActions .map( action => { - Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value)) - }) ++ Seq(Keep(TrueLiteral, output)), + shim.createKeep( + "COPY", + action.condition.getOrElse(TrueLiteral), + action.assignments.map(a => a.value)) + }) ++ Seq(shim.createKeep("COPY", TrueLiteral, output)), notMatchedInstructions = Nil, - notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)), + notMatchedBySourceInstructions = Seq(shim.createKeep("COPY", TrueLiteral, output)), checkCardinality = false, output = output, child = readPlan @@ -355,16 +359,20 @@ case class MergeIntoPaimonDataEvolutionTable( Join(targetTableProj, sourceTableProj, LeftOuter, Some(matchedCondition), JoinHint.NONE) val rowFromSourceAttr = attribute(ROW_FROM_SOURCE, joinPlan) val rowFromTargetAttr = attribute(ROW_FROM_TARGET, joinPlan) + val shim = SparkShimLoader.shim val mergeRows = MergeRows( isSourceRowPresent = rowFromSourceAttr, isTargetRowPresent = rowFromTargetAttr, matchedInstructions = realUpdateActions .map( action => { - Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value)) - }) ++ Seq(Keep(TrueLiteral, output)), + shim.createKeep( + "COPY", + action.condition.getOrElse(TrueLiteral), + action.assignments.map(a => a.value)) + }) ++ Seq(shim.createKeep("COPY", TrueLiteral, output)), notMatchedInstructions = Nil, - notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)).toSeq, + notMatchedBySourceInstructions = Seq(shim.createKeep("COPY", TrueLiteral, output)).toSeq, checkCardinality = false, output = output, child = joinPlan @@ -393,13 +401,15 @@ case class MergeIntoPaimonDataEvolutionTable( Join(sourceRelation, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE) // merge rows as there are multiple not matched actions + val shim = SparkShimLoader.shim val mergeRows = MergeRows( isSourceRowPresent = TrueLiteral, isTargetRowPresent = FalseLiteral, matchedInstructions = Nil, notMatchedInstructions = notMatchedActions.map { case insertAction: InsertAction => - Keep( + shim.createKeep( + "COPY", insertAction.condition.getOrElse(TrueLiteral), insertAction.assignments.map( a => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index d956a9472f11..f555c464e322 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -253,7 +253,8 @@ case class MergeIntoPaimonTable( def processMergeActions(actions: Seq[MergeAction]): Seq[Seq[Expression]] = { val columnExprs = actions.map { - case UpdateAction(_, assignments) => + case u: UpdateAction => + val assignments = u.assignments var exprs = assignments.map(_.value) if (writeRowTracking) { exprs ++= Seq(rowIdAttr, Literal(null)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala index ddbd9df5ac1b..84e7dfc01c0c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala @@ -103,7 +103,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) plan.resolveOperatorsUp { case u: UnresolvedWith => u.copy(cteRelations = u.cteRelations.map( - t => (t._1, transformPaimonV1Function(t._2).asInstanceOf[SubqueryAlias]))) + t => t.copy(_1 = t._1, _2 = transformPaimonV1Function(t._2).asInstanceOf[SubqueryAlias]))) case l: LogicalPlan => l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) { case u: UnresolvedFunction => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala index 2cb0101653af..94337124a13b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution import org.apache.paimon.utils.StringUtils -import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} @@ -32,7 +31,7 @@ import org.apache.spark.sql.execution.datasources.v2.json.JsonTable import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.execution.datasources.v2.text.{TextScanBuilder, TextTable} -import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -50,71 +49,13 @@ object SparkFormatTable { paths: Seq[String], userSpecifiedSchema: Option[StructType], partitionSchema: StructType): PartitioningAwareFileIndex = { - - def globPaths: Boolean = { - val entry = options.get(DataSource.GLOB_PATHS_KEY) - Option(entry).forall(_ == "true") - } - - val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap - // Hadoop Configurations are case-sensitive. - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) - if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { - // We are reading from the results of a streaming query. We will load files from - // the metadata log instead of listing them using HDFS APIs. - new PartitionedMetadataLogFileIndex( - sparkSession, - new Path(paths.head), - options.asScala.toMap, - userSpecifiedSchema, - partitionSchema = partitionSchema) - } else { - // This is a non-streaming file based datasource. - val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary( - paths, - hadoopConf, - checkEmptyGlobPath = true, - checkFilesExist = true, - enableGlobbing = globPaths) - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - - new PartitionedInMemoryFileIndex( - sparkSession, - rootPathsSpecified, - caseSensitiveMap, - userSpecifiedSchema, - fileStatusCache, - partitionSchema = partitionSchema) - } - } - - // Extend from MetadataLogFileIndex to override partitionSchema - private class PartitionedMetadataLogFileIndex( - sparkSession: SparkSession, - path: Path, - parameters: Map[String, String], - userSpecifiedSchema: Option[StructType], - override val partitionSchema: StructType) - extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) - - // Extend from InMemoryFileIndex to override partitionSchema - private class PartitionedInMemoryFileIndex( - sparkSession: SparkSession, - rootPathsSpecified: Seq[Path], - parameters: Map[String, String], - userSpecifiedSchema: Option[StructType], - fileStatusCache: FileStatusCache = NoopCache, - userSpecifiedPartitionSpec: Option[PartitionSpec] = None, - metadataOpsTimeNs: Option[Long] = None, - override val partitionSchema: StructType) - extends InMemoryFileIndex( + SparkShimLoader.shim.createFileIndex( + options, sparkSession, - rootPathsSpecified, - parameters, + paths, userSpecifiedSchema, - fileStatusCache, - userSpecifiedPartitionSpec, - metadataOpsTimeNs) + partitionSchema) + } } trait PartitionedFormatTable extends SupportsPartitionManagement { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index 98296a400672..0dd32a615a52 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -28,11 +28,14 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable} +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap import java.util.{Map => JMap} @@ -88,6 +91,8 @@ trait SparkShim { notMatchedBySourceActions: Seq[MergeAction], withSchemaEvolution: Boolean): MergeIntoTable + def createKeep(context: String, condition: Expression, output: Seq[Expression]): Instruction + // for variant def toPaimonVariant(o: Object): Variant @@ -98,4 +103,11 @@ trait SparkShim { def isSparkVariantType(dataType: org.apache.spark.sql.types.DataType): Boolean def SparkVariantType(): org.apache.spark.sql.types.DataType + + def createFileIndex( + options: CaseInsensitiveStringMap, + sparkSession: SparkSession, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + partitionSchema: StructType): PartitioningAwareFileIndex } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/MemoryStreamWrapper.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/MemoryStreamWrapper.scala new file mode 100644 index 000000000000..9e2566d93dc3 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/MemoryStreamWrapper.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SQLContext} +import org.apache.spark.sql.execution.streaming.Offset + +import scala.util.Try + +/** + * A wrapper for MemoryStream to handle Spark version compatibility. In Spark 4.1+, MemoryStream was + * moved from `org.apache.spark.sql.execution.streaming` to + * `org.apache.spark.sql.execution.streaming.runtime`. + */ +class MemoryStreamWrapper[A] private (stream: AnyRef) { + + private val streamClass = stream.getClass + + def toDS(): Dataset[A] = { + streamClass.getMethod("toDS").invoke(stream).asInstanceOf[Dataset[A]] + } + + def toDF(): DataFrame = { + streamClass.getMethod("toDF").invoke(stream).asInstanceOf[DataFrame] + } + + def addData(data: A*): Offset = { + val method = streamClass.getMethod("addData", classOf[TraversableOnce[_]]) + method.invoke(stream, data).asInstanceOf[Offset] + } +} + +object MemoryStreamWrapper { + + /** Creates a MemoryStream wrapper that works across different Spark versions. */ + def apply[A](implicit encoder: Encoder[A], sqlContext: SQLContext): MemoryStreamWrapper[A] = { + val stream = createMemoryStream[A] + new MemoryStreamWrapper[A](stream) + } + + private def createMemoryStream[A](implicit + encoder: Encoder[A], + sqlContext: SQLContext): AnyRef = { + // Try Spark 4.1+ path first (runtime package) + val spark41Class = Try( + Class.forName("org.apache.spark.sql.execution.streaming.runtime.MemoryStream$")) + if (spark41Class.isSuccess) { + val companion = spark41Class.get.getField("MODULE$").get(null) + // Spark 4.1+ uses implicit SparkSession instead of SQLContext + val applyMethod = companion.getClass.getMethod( + "apply", + classOf[Encoder[_]], + classOf[org.apache.spark.sql.SparkSession] + ) + return applyMethod.invoke(companion, encoder, sqlContext.sparkSession).asInstanceOf[AnyRef] + } + + // Fallback to Spark 3.x / 4.0 path + val oldClass = + Class.forName("org.apache.spark.sql.execution.streaming.MemoryStream$") + val companion = oldClass.getField("MODULE$").get(null) + val applyMethod = companion.getClass.getMethod( + "apply", + classOf[Encoder[_]], + classOf[SQLContext] + ) + applyMethod.invoke(companion, encoder, sqlContext).asInstanceOf[AnyRef] + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala index e103429559ba..6300600a820b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala @@ -18,8 +18,9 @@ package org.apache.paimon.spark +import org.apache.paimon.spark.MemoryStreamWrapper + import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest { @@ -150,7 +151,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest { val location = table.location().toString // streaming write - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val writeStream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index c43170d7ba1b..3c92b7eed9d3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -19,10 +19,10 @@ package org.apache.paimon.spark import org.apache.paimon.Snapshot.CommitKind._ +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} import org.apache.spark.sql.streaming.StreamTest @@ -47,7 +47,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -91,7 +91,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -131,7 +131,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData.toDS .toDF("uid", "city") .groupBy("city") @@ -175,7 +175,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] intercept[RuntimeException] { inputData .toDF() @@ -199,7 +199,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Long, Int, Double)] + val inputData = MemoryStreamWrapper[(Long, Int, Double)] val data = inputData.toDS .toDF("time", "stockId", "price") .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price") @@ -256,7 +256,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { spark.sql("SELECT * FROM T ORDER BY a, b"), Row(1, "2023-08-09") :: Row(2, "2023-08-09") :: Nil) - val inputData = MemoryStream[(Long, Date, Int)] + val inputData = MemoryStreamWrapper[(Long, Date, Int)] val stream = inputData .toDS() .toDF("a", "b", "c") @@ -325,7 +325,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { val table = loadTable("T") val location = table.location().toString - val inputData = MemoryStream[(Int, Int)] + val inputData = MemoryStreamWrapper[(Int, Int)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala index 316c36c40c56..59b5c8fd1cb5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala @@ -18,10 +18,10 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest { @@ -37,7 +37,7 @@ class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala index 67786a47fe3f..4b866875eceb 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala @@ -18,10 +18,10 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class BranchProcedureTest extends PaimonSparkTestBase with StreamTest { @@ -38,7 +38,7 @@ class BranchProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index e89eba2e8599..825f12b997cd 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.Snapshot.CommitKind import org.apache.paimon.fs.Path +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.spark.utils.SparkProcedureUtils import org.apache.paimon.table.FileStoreTable @@ -27,7 +28,6 @@ import org.apache.paimon.table.source.DataSplit import org.apache.spark.scheduler.{SparkListener, SparkListenerStageSubmitted} import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest import org.assertj.core.api.Assertions import org.scalatest.time.Span @@ -102,7 +102,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, Int)] + val inputData = MemoryStreamWrapper[(Int, Int)] val stream = inputData .toDS() .toDF("a", "b") @@ -198,7 +198,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, Int, Int)] + val inputData = MemoryStreamWrapper[(Int, Int, Int)] val stream = inputData .toDS() .toDF("p", "a", "b") @@ -368,7 +368,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, Int)] + val inputData = MemoryStreamWrapper[(Int, Int)] val stream = inputData .toDS() .toDF("a", "b") @@ -822,7 +822,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, Int, String)] + val inputData = MemoryStreamWrapper[(Int, Int, String)] val stream = inputData .toDS() .toDF("a", "b", "c") @@ -970,7 +970,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, Int, String, Int)] + val inputData = MemoryStreamWrapper[(Int, Int, String, Int)] val stream = inputData .toDS() .toDF("a", "b", "c", "pt") @@ -1184,7 +1184,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, Int, String)] + val inputData = MemoryStreamWrapper[(Int, Int, String)] val stream = inputData .toDS() .toDF("a", "b", "c") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index 4a4c7ae215df..bcb53faf957b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -18,10 +18,10 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTest { @@ -39,7 +39,7 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -146,7 +146,7 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala index e9b00298e492..2bc8fdbb3101 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala @@ -18,11 +18,11 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.utils.SnapshotNotExistException import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with StreamTest { @@ -39,7 +39,7 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -116,7 +116,7 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index 586f2e6c2d72..1d2bd0981e72 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -18,10 +18,10 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest import org.assertj.core.api.Assertions.assertThatThrownBy @@ -41,7 +41,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") @@ -93,7 +93,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String, String)] + val inputData = MemoryStreamWrapper[(String, String, String)] val stream = inputData .toDS() .toDF("k", "pt", "hm") @@ -162,7 +162,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") @@ -218,7 +218,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") @@ -286,7 +286,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String, String)] + val inputData = MemoryStreamWrapper[(String, String, String)] val stream = inputData .toDS() .toDF("k", "pt", "hm") @@ -352,7 +352,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") @@ -417,7 +417,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String, String)] + val inputData = MemoryStreamWrapper[(String, String, String)] val stream = inputData .toDS() .toDF("k", "pt", "hm") @@ -487,7 +487,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String, String)] + val inputData = MemoryStreamWrapper[(String, String, String)] val stream = inputData .toDS() .toDF("k", "pt", "hm") @@ -565,7 +565,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") @@ -634,7 +634,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") @@ -701,7 +701,7 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(String, String)] + val inputData = MemoryStreamWrapper[(String, String)] val stream = inputData .toDS() .toDF("k", "pt") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala index aa65d8b9c38e..f1e3f2f14859 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala @@ -18,11 +18,11 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.utils.SnapshotManager import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest import org.assertj.core.api.Assertions.{assertThat, assertThatIllegalArgumentException} @@ -44,7 +44,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -100,7 +100,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -175,7 +175,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -230,7 +230,7 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 66f2d57e02bc..9fc0182b5dee 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -18,10 +18,10 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.spark.MemoryStreamWrapper import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { @@ -40,7 +40,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { val table = loadTable("T") val location = table.location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") @@ -169,7 +169,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] + val inputData = MemoryStreamWrapper[(Int, String)] val stream = inputData .toDS() .toDF("a", "b") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala index 02a5b9a83015..2b147dbf93fa 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala @@ -49,7 +49,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp val node1 = nodes(0) assert( node1.isInstanceOf[AppendDataExec] && - node1.toString.contains("PaimonWrite(table=test.t1"), + node1.asInstanceOf[AppendDataExec].write.toString.contains("PaimonWrite(table=test.t1"), s"Expected AppendDataExec with specific paimon write, but got: $node1" ) @@ -92,7 +92,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp val node1 = nodes(0) assert( node1.isInstanceOf[AppendDataExec] && - node1.toString.contains("PaimonWrite(table=test.t1"), + node1.asInstanceOf[AppendDataExec].write.toString.contains("PaimonWrite(table=test.t1"), s"Expected AppendDataExec with specific paimon write, but got: $node1" ) diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index 70011e14c3c2..202974fd2e41 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -24,20 +24,27 @@ import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensi import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} +import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap import java.util.{Map => JMap} +import scala.collection.JavaConverters._ + class Spark3Shim extends SparkShim { override def classicApi: ClassicApi = new Classic3Api @@ -108,6 +115,13 @@ class Spark3Shim extends SparkShim { notMatchedBySourceActions) } + override def createKeep( + context: String, + condition: Expression, + output: Seq[Expression]): Instruction = { + MergeRows.Keep(condition, output) + } + override def toPaimonVariant(o: Object): Variant = throw new UnsupportedOperationException() override def isSparkVariantType(dataType: org.apache.spark.sql.types.DataType): Boolean = false @@ -120,4 +134,70 @@ class Spark3Shim extends SparkShim { override def toPaimonVariant(array: ArrayData, pos: Int): Variant = throw new UnsupportedOperationException() + + def createFileIndex( + options: CaseInsensitiveStringMap, + sparkSession: SparkSession, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + partitionSchema: StructType): PartitioningAwareFileIndex = { + + class PartitionedMetadataLogFileIndex( + sparkSession: SparkSession, + path: Path, + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + override val partitionSchema: StructType) + extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) + + class PartitionedInMemoryFileIndex( + sparkSession: SparkSession, + rootPathsSpecified: Seq[Path], + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache, + userSpecifiedPartitionSpec: Option[PartitionSpec] = None, + metadataOpsTimeNs: Option[Long] = None, + override val partitionSchema: StructType) + extends InMemoryFileIndex( + sparkSession, + rootPathsSpecified, + parameters, + userSpecifiedSchema, + fileStatusCache, + userSpecifiedPartitionSpec, + metadataOpsTimeNs) + + def globPaths: Boolean = { + val entry = options.get(DataSource.GLOB_PATHS_KEY) + Option(entry).forall(_ == "true") + } + + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { + new PartitionedMetadataLogFileIndex( + sparkSession, + new Path(paths.head), + options.asScala.toMap, + userSpecifiedSchema, + partitionSchema = partitionSchema) + } else { + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary( + paths, + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = globPaths) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + + new PartitionedInMemoryFileIndex( + sparkSession, + rootPathsSpecified, + caseSensitiveMap, + userSpecifiedSchema, + fileStatusCache, + partitionSchema = partitionSchema) + } + } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala index d8ba2847ab88..048a2c0c6e43 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.data import org.apache.paimon.types.DataType -import org.apache.spark.unsafe.types.VariantVal +import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, VariantVal} class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkArrayData { @@ -28,4 +28,12 @@ class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkA val v = paimonArray.getVariant(ordinal) new VariantVal(v.value(), v.metadata()) } + + def getGeography(ordinal: Int): GeographyVal = { + throw new UnsupportedOperationException("GeographyVal is not supported") + } + + def getGeometry(ordinal: Int): GeometryVal = { + throw new UnsupportedOperationException("GeometryVal is not supported") + } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala index 9ac2766346f9..0447b26a3273 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala @@ -21,7 +21,7 @@ package org.apache.paimon.spark.data import org.apache.paimon.spark.AbstractSparkInternalRow import org.apache.paimon.types.RowType -import org.apache.spark.unsafe.types.VariantVal +import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, VariantVal} class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowType) { @@ -29,4 +29,12 @@ class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowTy val v = row.getVariant(i) new VariantVal(v.value(), v.metadata()) } + + def getGeography(ordinal: Int): GeographyVal = { + throw new UnsupportedOperationException("GeographyVal is not supported") + } + + def getGeometry(ordinal: Int): GeometryVal = { + throw new UnsupportedOperationException("GeometryVal is not supported") + } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala index 0a208daea292..c52207e43197 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala @@ -18,12 +18,10 @@ package org.apache.paimon.spark.data -import org.apache.paimon.spark.AbstractSparkInternalRow import org.apache.paimon.types.RowType import org.apache.paimon.utils.InternalRowUtils.copyInternalRow import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.unsafe.types.VariantVal class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int, blobAsDescriptor: Boolean) extends Spark4InternalRow(rowType) { diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala new file mode 100644 index 000000000000..ba2d790c7a8a --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.paimon.spark.catalyst.analysis.{PaimonDeleteTable, PaimonMergeInto, PaimonUpdateTable} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast} +import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy + +/** + * A rule that resolves assignments in row-level commands. + * + * Note that this rule must be run before rewriting row-level commands into executable plans. This + * rule does not apply to tables that accept any schema. Such tables must inject their own rules to + * resolve assignments. + */ +object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + val plan1 = applyInt(plan) + if (org.apache.spark.SPARK_VERSION >= "4.1") { + val plan2 = PaimonUpdateTable.apply(plan1) + val plan3 = PaimonDeleteTable.apply(plan2) + PaimonMergeInto(null).apply(plan3) + } else { + plan1 + } + } + + private def applyInt(plan: LogicalPlan): LogicalPlan = + plan.resolveOperatorsWithPruning(_.containsPattern(COMMAND), ruleId) { + case u: UpdateTable if !u.skipSchemaResolution && u.resolved && u.rewritable && !u.aligned => + validateStoreAssignmentPolicy() + val newTable = cleanAttrMetadata(u.table) + val newAssignments = AssignmentUtils.alignUpdateAssignments( + u.table.output, + u.assignments, + fromStar = false, + coerceNestedTypes = false) + u.copy(table = newTable, assignments = newAssignments) + + case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned => + resolveAssignments(u) + + case m: MergeIntoTable + if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned && + !m.needSchemaEvolution => + validateStoreAssignmentPolicy() + val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes && m.withSchemaEvolution + m.copy( + targetTable = cleanAttrMetadata(m.targetTable), + matchedActions = alignActions(m.targetTable.output, m.matchedActions, coerceNestedTypes), + notMatchedActions = + alignActions(m.targetTable.output, m.notMatchedActions, coerceNestedTypes), + notMatchedBySourceActions = + alignActions(m.targetTable.output, m.notMatchedBySourceActions, coerceNestedTypes) + ) + + case m: MergeIntoTable + if !m.skipSchemaResolution && m.resolved && !m.aligned + && !m.needSchemaEvolution => + resolveAssignments(m) + } + + private def validateStoreAssignmentPolicy(): Unit = { + // SPARK-28730: LEGACY store assignment policy is disallowed in data source v2 + if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) { + throw QueryCompilationErrors.legacyStoreAssignmentPolicyError() + } + } + + private def cleanAttrMetadata(table: LogicalPlan): LogicalPlan = { + table.transform { + case r: DataSourceV2Relation => + r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata)) + } + } + + private def resolveAssignments(p: LogicalPlan): LogicalPlan = { + p.transformExpressions { + case assignment: Assignment => + val nullHandled = if (!assignment.key.nullable && assignment.value.nullable) { + AssertNotNull(assignment.value) + } else { + assignment.value + } + val casted = if (assignment.key.dataType != nullHandled.dataType) { + val cast = Cast(nullHandled, assignment.key.dataType, ansiEnabled = true) + cast.setTagValue(Cast.BY_TABLE_INSERTION, ()) + cast + } else { + nullHandled + } + val rawKeyType = assignment.key.transform { + case a: AttributeReference => + CharVarcharUtils.getRawType(a.metadata).map(a.withDataType).getOrElse(a) + }.dataType + val finalValue = if (CharVarcharUtils.hasCharVarchar(rawKeyType)) { + CharVarcharUtils.stringLengthCheck(casted, rawKeyType) + } else { + casted + } + val cleanedKey = assignment.key.transform { + case a: AttributeReference => CharVarcharUtils.cleanAttrMetadata(a) + } + Assignment(cleanedKey, finalValue) + } + } + + private def alignActions( + attrs: Seq[Attribute], + actions: Seq[MergeAction], + coerceNestedTypes: Boolean): Seq[MergeAction] = { + actions.map { + case u @ UpdateAction(_, assignments, fromStar) => + u.copy(assignments = + AssignmentUtils.alignUpdateAssignments(attrs, assignments, fromStar, coerceNestedTypes)) + case d: DeleteAction => + d + case i @ InsertAction(_, assignments) => + i.copy(assignments = + AssignmentUtils.alignInsertAssignments(attrs, assignments, coerceNestedTypes)) + case other => + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3052", + messageParameters = Map("other" -> other.toString)) + } + } +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala new file mode 100644 index 000000000000..714abd0a6aa9 --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/MinorVersionShim.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow} +import org.apache.paimon.types.{DataType, RowType} + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.MergeRows +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex +import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ + +object MinorVersionShim { + + def createKeep(context: String, condition: Expression, output: Seq[Expression]): Instruction = { + val ctx = context match { + case "COPY" => MergeRows.Copy + case "DELETE" => MergeRows.Delete + case "INSERT" => MergeRows.Insert + case "UPDATE" => MergeRows.Update + case _ => MergeRows.Copy + } + + MergeRows.Keep(ctx, condition, output) + } + + def createSparkInternalRow(rowType: RowType): SparkInternalRow = { + new Spark4InternalRow(rowType) + } + + def createSparkInternalRowWithBlob( + rowType: RowType, + blobFieldIndex: Int, + blobAsDescriptor: Boolean): SparkInternalRow = { + new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor) + } + + def createSparkArrayData(elementType: DataType): SparkArrayData = { + new Spark4ArrayData(elementType) + } + + def createFileIndex( + options: CaseInsensitiveStringMap, + sparkSession: SparkSession, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + partitionSchema: StructType): PartitioningAwareFileIndex = { + + class PartitionedMetadataLogFileIndex( + sparkSession: SparkSession, + path: Path, + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + override val partitionSchema: StructType) + extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) + + class PartitionedInMemoryFileIndex( + sparkSession: SparkSession, + rootPathsSpecified: Seq[Path], + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache, + userSpecifiedPartitionSpec: Option[PartitionSpec] = None, + metadataOpsTimeNs: Option[Long] = None, + override val partitionSchema: StructType) + extends InMemoryFileIndex( + sparkSession, + rootPathsSpecified, + parameters, + userSpecifiedSchema, + fileStatusCache, + userSpecifiedPartitionSpec, + metadataOpsTimeNs) + + def globPaths: Boolean = { + val entry = options.get(DataSource.GLOB_PATHS_KEY) + Option(entry).forall(_ == "true") + } + + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { + new PartitionedMetadataLogFileIndex( + sparkSession, + new Path(paths.head), + options.asScala.toMap, + userSpecifiedSchema, + partitionSchema = partitionSchema) + } else { + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary( + paths, + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = globPaths) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + + new PartitionedInMemoryFileIndex( + sparkSession, + rootPathsSpecified, + caseSensitiveMap, + userSpecifiedSchema, + fileStatusCache, + partitionSchema = partitionSchema) + } + } + +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index ad36acfb26a9..f4d86bf759ed 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims import org.apache.paimon.data.variant.{GenericVariant, Variant} import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser -import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow} +import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.SparkSession @@ -30,11 +30,14 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable} +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.{DataTypes, StructType, VariantType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.VariantVal import java.util.{Map => JMap} @@ -52,18 +55,18 @@ class Spark4Shim extends SparkShim { } override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { - new Spark4InternalRow(rowType) + MinorVersionShim.createSparkInternalRow(rowType) } override def createSparkInternalRowWithBlob( rowType: RowType, blobFieldIndex: Int, blobAsDescriptor: Boolean): SparkInternalRow = { - new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor) + MinorVersionShim.createSparkInternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor) } override def createSparkArrayData(elementType: DataType): SparkArrayData = { - new Spark4ArrayData(elementType) + MinorVersionShim.createSparkArrayData(elementType) } override def createTable( @@ -113,6 +116,13 @@ class Spark4Shim extends SparkShim { withSchemaEvolution) } + override def createKeep( + context: String, + condition: Expression, + output: Seq[Expression]): Instruction = { + MinorVersionShim.createKeep(context, condition, output) + } + override def toPaimonVariant(o: Object): Variant = { val v = o.asInstanceOf[VariantVal] new GenericVariant(v.getValue, v.getMetadata) @@ -132,4 +142,18 @@ class Spark4Shim extends SparkShim { dataType.isInstanceOf[VariantType] override def SparkVariantType(): org.apache.spark.sql.types.DataType = DataTypes.VariantType + + override def createFileIndex( + options: CaseInsensitiveStringMap, + sparkSession: SparkSession, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + partitionSchema: StructType): PartitioningAwareFileIndex = { + MinorVersionShim.createFileIndex( + options, + sparkSession, + paths, + userSpecifiedSchema, + partitionSchema) + } } diff --git a/pom.xml b/pom.xml index 600a65c3c08a..9270bc5a26bb 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ under the License. 1.20.1 2.12 2.12.18 - 2.13.16 + 2.13.17 ${scala212.version} ${scala212.version} 1.1.8.4 @@ -425,17 +425,18 @@ under the License. paimon-spark/paimon-spark4-common paimon-spark/paimon-spark-4.0 + paimon-spark/paimon-spark-4.1 17 4.13.1 2.13 ${scala213.version} - 4.0.1 + 4.1.0 paimon-spark4-common_2.13 18.1.0 - 4.0 - 4.0.1 + 4.1 + 4.1.0