diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index f9820178c25..8dac2a0477e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -259,6 +259,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { GlutenPartition( index, planByteArray, + wsCtx.queryId, splitInfos.toArray ) } @@ -315,6 +316,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { inputIterators: Seq[Iterator[ColumnarBatch]], sparkConf: SparkConf, rootNode: PlanNode, + queryId: String, pipelineTime: SQLMetric, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 832932f0a49..5c3dbd356e6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -481,7 +481,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { child: SparkPlan, numOutputRows: SQLMetric, dataSize: SQLMetric, - buildThreads: SQLMetric): BuildSideRelation = { + buildThreads: SQLMetric, + queryId: String): BuildSideRelation = { val (buildKeys, isNullAware) = mode match { case mode1: HashedRelationBroadcastMode => diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index 2ea7b9e554d..33180b0d0d9 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -194,7 +194,8 @@ object GlutenDeltaJobStatsTracker extends Logging { null, null, 0, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + s"Gluten_Delta_Stats_${UUID.randomUUID()}" ) nativeOutItr } diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index ca6c7a6a7fb..2016a5a6cc6 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -198,7 +198,8 @@ object GlutenDeltaJobStatsTracker extends Logging { null, null, 0, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + s"Gluten_Delta_Stats_${UUID.randomUUID()}" ) nativeOutItr } diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index 106d8b98170..6b9dbc5dbf9 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -35,9 +35,9 @@ public long rtHandle() { return runtime.getHandle(); } - public static native void clearHashTable(long hashTableData); + public static native void clearHashTable(String cacheKey, long hashTableData); - public static native long cloneHashTable(long hashTableData); + public static native long cloneHashTable(String cacheKey, long hashTableData); public native long nativeBuild( String buildHashTableId, @@ -51,5 +51,6 @@ public native long nativeBuild( byte[] namedStruct, boolean isNullAwareAntiJoin, long bloomFilterPushdownSize, - int broadcastHashTableBuildThreads); + int broadcastHashTableBuildThreads, + String queryId); } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index d8b23b358fa..6e6264c567d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -130,6 +130,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { GlutenPartition( index, planByteArray, + wsCtx.queryId, splitInfos.toArray ) } @@ -216,7 +217,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null, if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null, partitionIndex, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + inputPartition.queryId ) resIter.noMoreSplits() val itrMetrics = IteratorMetricsJniWrapper.create() @@ -243,6 +245,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { inputIterators: Seq[Iterator[ColumnarBatch]], sparkConf: SparkConf, rootNode: PlanNode, + queryId: String, pipelineTime: SQLMetric, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, @@ -270,7 +273,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { null, if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null, partitionIndex, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + queryId ) nativeResultIterator.noMoreSplits() val itrMetrics = IteratorMetricsJniWrapper.create() diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index be21337d994..6b4308124f3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -715,7 +715,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { child: SparkPlan, numOutputRows: SQLMetric, dataSize: SQLMetric, - buildThreads: SQLMetric): BuildSideRelation = { + buildThreads: SQLMetric, + queryId: String): BuildSideRelation = { val buildKeys = mode match { case mode1: HashedRelationBroadcastMode => @@ -875,7 +876,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { mode, newBuildKeys, offload, - buildThreadsValue) + buildThreadsValue, + queryId) } } else { ColumnarBuildSideRelation( @@ -884,7 +886,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { mode, newBuildKeys, offload, - buildThreadsValue) + buildThreadsValue, + queryId) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 1554c4ddd3e..f5c8fd8bb61 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -26,8 +26,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.ColumnarBuildSideRelation import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import org.apache.spark.sql.vectorized.ColumnarBatch import io.substrait.proto.JoinRel @@ -106,7 +108,7 @@ case class BroadcastHashJoinExecTransformer( isNullAwareAntiJoin) { // Unique ID for built table - lazy val buildBroadcastTableId: String = buildPlan.id.toString + lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan) override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match { case _: InnerLike => @@ -134,17 +136,18 @@ case class BroadcastHashJoinExecTransformer( override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = { val streamedRDD = getColumnarInputRDDs(streamedPlan) + val broadcast = buildPlan.executeBroadcast[BuildSideRelation]() + val queryId = extractQueryId(broadcast.value) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val cacheKey = s"$queryId:$buildHashTableId" if (executionId != null) { - GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) + GlutenDriverEndpoint.collectResources(executionId, cacheKey) } else { logWarning( - s"Can not trace broadcast table data $buildBroadcastTableId" + + s"Can not trace broadcast table data $cacheKey" + s" because execution id is null." + s" Will clean up until expire time.") } - - val broadcast = buildPlan.executeBroadcast[BuildSideRelation]() val bloomFilterPushdownSize = if (VeloxConfig.get.hashProbeDynamicFilterPushdownEnabled) { VeloxConfig.get.hashProbeBloomFilterPushdownMaxSize } else { @@ -174,7 +177,7 @@ case class BroadcastHashJoinExecTransformer( buildPlan.output, filterBuildColumns, filterPropagatesNulls, - buildBroadcastTableId, + cacheKey, isNullAwareAntiJoin, bloomFilterPushdownSize, metrics.get("buildHashTableTime") @@ -183,6 +186,15 @@ case class BroadcastHashJoinExecTransformer( // FIXME: Do we have to make build side a RDD? streamedRDD :+ broadcastRDD } + + private def extractQueryId(relation: BuildSideRelation): String = relation match { + case columnar: ColumnarBuildSideRelation if columnar.queryId.nonEmpty => + columnar.queryId + case unsafe: UnsafeColumnarBuildSideRelation if unsafe.queryIdValue.nonEmpty => + unsafe.queryIdValue + case _ => + GlutenQueryContext.getRequiredQueryId(buildPlan) + } } case class BroadcastHashJoinContext( diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 535fd8900e1..c8fcd574cdd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache } } - HashJoinBuilder.clearHashTable(value.pointer) + HashJoinBuilder.clearHashTable(key, value.pointer) } } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index fea9f149745..de84bd97f49 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -52,7 +52,8 @@ object ColumnarBuildSideRelation { mode: BroadcastMode, newBuildKeys: Seq[Expression] = Seq.empty, offload: Boolean = false, - buildThreads: Int = 1): ColumnarBuildSideRelation = { + buildThreads: Int = 1, + queryId: String = ""): ColumnarBuildSideRelation = { val boundMode = mode match { case HashedRelationBroadcastMode(keys, isNullAware) => // Bind each key to the build-side output so simple cols become BoundReference @@ -68,7 +69,8 @@ object ColumnarBuildSideRelation { BroadcastModeUtils.toSafe(boundMode), newBuildKeys, offload, - buildThreads) + buildThreads, + queryId) } } @@ -78,7 +80,8 @@ case class ColumnarBuildSideRelation( safeBroadcastMode: SafeBroadcastMode, newBuildKeys: Seq[Expression], offload: Boolean, - buildThreads: Int) + buildThreads: Int, + queryId: String) extends BuildSideRelation with Logging with KnownSizeEstimation { @@ -223,7 +226,8 @@ case class ColumnarBuildSideRelation( SubstraitUtil.toNameStruct(newOutput).toByteArray, broadcastContext.isNullAwareAntiJoin, broadcastContext.bloomFilterPushdownSize, - buildThreads + buildThreads, + queryId ) jniWrapper.close(serializeHandle) @@ -234,7 +238,7 @@ case class ColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index fbc329f3606..39259fd52f3 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -57,7 +57,8 @@ object UnsafeColumnarBuildSideRelation { mode: BroadcastMode, newBuildKeys: Seq[Expression] = Seq.empty, offload: Boolean = false, - buildThreads: Int = 1): UnsafeColumnarBuildSideRelation = { + buildThreads: Int = 1, + queryId: String = ""): UnsafeColumnarBuildSideRelation = { val boundMode = mode match { case HashedRelationBroadcastMode(keys, isNullAware) => // Bind each key to the build-side output so simple cols become BoundReference @@ -73,7 +74,8 @@ object UnsafeColumnarBuildSideRelation { BroadcastModeUtils.toSafe(boundMode), newBuildKeys, offload, - buildThreads) + buildThreads, + queryId) } } @@ -94,7 +96,8 @@ class UnsafeColumnarBuildSideRelation( private var safeBroadcastMode: SafeBroadcastMode, private var newBuildKeys: Seq[Expression], private var offload: Boolean, - private var buildThreads: Int) + private var buildThreads: Int, + private var queryId: String) extends BuildSideRelation with Externalizable with Logging @@ -114,9 +117,11 @@ class UnsafeColumnarBuildSideRelation( def isOffload: Boolean = offload + def queryIdValue: String = queryId + /** needed for serialization. */ def this() = { - this(null, null, null, Seq.empty, false, 1) + this(null, null, null, Seq.empty, false, 1, "") } private[unsafe] def getBatches(): Seq[UnsafeByteArray] = { @@ -193,7 +198,8 @@ class UnsafeColumnarBuildSideRelation( SubstraitUtil.toNameStruct(newOutput).toByteArray, broadcastContext.isNullAwareAntiJoin, broadcastContext.bloomFilterPushdownSize, - buildThreads + buildThreads, + queryId ) jniWrapper.close(serializeHandle) @@ -204,7 +210,7 @@ class UnsafeColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } @@ -219,6 +225,7 @@ class UnsafeColumnarBuildSideRelation( out.writeObject(newBuildKeys) out.writeBoolean(offload) out.writeInt(buildThreads) + out.writeUTF(queryId) } override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException { @@ -228,6 +235,7 @@ class UnsafeColumnarBuildSideRelation( kryo.writeClassAndObject(out, newBuildKeys) out.writeBoolean(offload) out.writeInt(buildThreads) + out.writeString(queryId) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -237,6 +245,7 @@ class UnsafeColumnarBuildSideRelation( newBuildKeys = in.readObject().asInstanceOf[Seq[Expression]] offload = in.readBoolean() buildThreads = in.readInt() + queryId = in.readUTF() } override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { @@ -246,6 +255,7 @@ class UnsafeColumnarBuildSideRelation( newBuildKeys = kryo.readClassAndObject(in).asInstanceOf[Seq[Expression]] offload = in.readBoolean() buildThreads = in.readInt() + queryId = in.readString() } private def transformProjection: UnsafeProjection = safeBroadcastMode match { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 4ab944898bd..cf5f40a1863 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -42,12 +42,15 @@ struct SparkTaskInfo { int32_t partitionId{0}; // Same as TID. int64_t taskId{0}; + // Same as Spark SQL execution id. -1 means unavailable. + int64_t executionId{-1}; + std::string queryId{}; // virtual id for each backend internal use int32_t vId{0}; std::string toString() const { - return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + - "]"; + return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) + + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]"; } friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) { diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 827c5ad8bdd..c0db58d4a49 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -468,13 +468,15 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jint stageId, jint partitionId, jlong taskId, + jlong executionId, + jstring queryId, jboolean enableDumping, jstring spillDir) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); - ctx->setSparkTaskInfo({stageId, partitionId, taskId}); + ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId, jStringToCString(env, queryId)}); if (enableDumping) { ctx->enableDumping(); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c3ac095cdc7..6a8de9eb72c 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -72,6 +72,18 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; } // namespace +namespace { +std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { + if (!taskInfo.queryId.empty()) { + return taskInfo.queryId; + } + if (taskInfo.executionId != -1) { + return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); + } + return fmt::format("Gluten_Execution_{}", ""); +} +} // namespace + WholeStageResultIterator::WholeStageResultIterator( VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, @@ -115,7 +127,7 @@ WholeStageResultIterator::WholeStageResultIterator( "Gluten_Stage_{}_TID_{}_VTID_{}", std::to_string(taskInfo_.stageId), std::to_string(taskInfo_.taskId), - std::to_string(taskInfo.vId)), + std::to_string(taskInfo_.vId)), std::move(planFragment), 0, std::move(queryCtx), @@ -233,11 +245,7 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), spillExecutor_, - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo_.vId))); + getVeloxTaskId(taskInfo_)); return ctx; } diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index aa4d9599435..1d8c5b1c536 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -46,6 +46,7 @@ #include "velox/common/base/BloomFilter.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/HashTable.h" +#include "velox/exec/HashTableCache.h" #ifdef GLUTEN_ENABLE_GPU #include "cudf/CudfPlanValidator.h" @@ -971,11 +972,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native jbyteArray namedStruct, jboolean isNullAwareAntiJoin, jlong bloomFilterPushdownSize, - jint numThreads) { + jint numThreads, + jstring queryId) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto* runtime = dynamic_cast(ctx); GLUTEN_CHECK(runtime != nullptr, "Not a Velox runtime"); + ctx->setSparkTaskInfo({0, 0, 0, -1, jStringToCString(env, queryId)}); const auto& queryConf = *(runtime->veloxCfg()); const auto minTableRowsForParallelJoinBuild = queryConf.get(kMinTableRowsForParallelJoinBuild, kMinTableRowsForParallelJoinBuildDefault); @@ -1060,6 +1063,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native nullptr); builder->setHashTable(std::move(mainTable)); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + + if (!cache->hasTable(hashTableId)) { + std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; + cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(builder); } @@ -1138,6 +1148,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native } hashTableBuilders[0]->setHashTable(std::move(mainTable)); + + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(hashTableId)) { + std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; + cache->injectTable( + hashTableId, + hashTableBuilders[0]->hashTable(), + hashTableBuilders[0]->joinHasNullKeys(), + defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableBuilders[0]); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1145,9 +1166,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START + auto cacheKeyStr = jStringToCString(env, cacheKey); auto hashTableHandler = ObjectStore::retrieve(tableHandler); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(cacheKeyStr)) { + cache->injectTable( + cacheKeyStr, hashTableHandler->hashTable(), hashTableHandler->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableHandler); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1155,13 +1184,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneH JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START - auto hashTableHandler = ObjectStore::retrieve(tableHandler); - hashTableHandler->hashTable()->clear(true); + auto cacheKeyStr = jStringToCString(env, cacheKey); + std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n"; + facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); ObjectStore::release(tableHandler); JNI_METHOD_END() } + #ifdef __cplusplus } #endif diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index b0fc0fc4a30..246b7814660 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -448,29 +448,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } else if ( sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { - std::string hashTableId = sJoin.hashtableid(); - - std::shared_ptr opaqueSharedHashTable = nullptr; - bool joinHasNullKeys = false; - - try { - auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); - joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); - auto originalShared = hashTableBuilder->hashTable(); - opaqueSharedHashTable = std::shared_ptr( - originalShared, reinterpret_cast(originalShared.get())); - - LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; - } catch (const std::exception& e) { - LOG(WARNING) - << "Error retrieving HashTable from ObjectStore: " << e.what() - << ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false."; - opaqueSharedHashTable = nullptr; - } - + const auto& hashTableId = sJoin.hashtableid(); + const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; // Create HashJoinNode node return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, @@ -479,14 +461,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: leftNode, rightNode, getJoinOutputType(leftNode, rightNode, joinType), - false, - false, - joinHasNullKeys, - opaqueSharedHashTable); + true); } else { // Create HashJoinNode node + auto joinNodeId = nextPlanNodeId(); return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 71965f6c737..39ba1d9db61 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,9 +17,9 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_06_06 -VELOX_ENHANCED_BRANCH=ibm-2026_06_06 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=dft-2026_06_06-hashtable-cache +VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache VELOX_HOME="" RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 6d2c90896b2..0d14d059903 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -34,6 +34,8 @@ public class NativePlanEvaluator { private static final Logger LOGGER = LoggerFactory.getLogger(NativePlanEvaluator.class); private static final AtomicInteger id = new AtomicInteger(0); + private static final long INVALID_EXECUTION_ID = -1L; + private static final String SPARK_EXECUTION_ID_KEY = "spark.sql.execution.id"; private final Runtime runtime; private final PlanEvaluatorJniWrapper jniWrapper; @@ -76,16 +78,20 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( byte[][] splitInfo, ColumnarBatchInIterator[] iterList, int partitionIndex, - String spillDirPath) + String spillDirPath, + String queryId) throws RuntimeException { + final TaskContext taskContext = TaskContext.get(); final long itrHandle = jniWrapper.nativeCreateKernelWithIterator( wsPlan, splitInfo, iterList, - TaskContext.get().stageId(), + taskContext.stageId(), partitionIndex, // TaskContext.getPartitionId(), - TaskContext.get().taskAttemptId(), + taskContext.taskAttemptId(), + getExecutionId(taskContext), + queryId, DebugUtil.isDumpingEnabledForTask(), spillDirPath); final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle); @@ -113,4 +119,18 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) { return new ColumnarBatchOutIterator(runtime, itrHandle); } + + private static long getExecutionId(TaskContext taskContext) { + final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY); + if (executionId == null) { + return INVALID_EXECUTION_ID; + } + try { + return Long.parseLong(executionId); + } catch (NumberFormatException e) { + LOGGER.warn( + "Invalid Spark execution id '{}', fallback to {}", executionId, INVALID_EXECUTION_ID); + return INVALID_EXECUTION_ID; + } + } } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index a8082906798..24b43e9329c 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -76,6 +76,8 @@ public native long nativeCreateKernelWithIterator( int stageId, int partitionId, long taskId, + long executionId, + String queryId, boolean enableDumping, String spillDir) throws RuntimeException; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index a6b00535935..5b2572c2623 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -81,6 +81,7 @@ trait IteratorApi { inputIterators: Seq[Iterator[ColumnarBatch]], sparkConf: SparkConf, rootNode: PlanNode, + queryId: String, pipelineTime: SQLMetric, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 79f3d67c0ec..ceb658693f0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -437,7 +437,8 @@ trait SparkPlanExecApi { child: SparkPlan, numOutputRows: SQLMetric, dataSize: SQLMetric, - buildThreads: SQLMetric = null): BuildSideRelation + buildThreads: SQLMetric = null, + queryId: String = ""): BuildSideRelation def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = { mode.canonicalized diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala new file mode 100644 index 00000000000..316030f4159 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.SparkPlan + +import java.util.UUID + +object GlutenQueryContext { + val QueryIdTag: TreeNodeTag[String] = TreeNodeTag[String]("org.apache.gluten.QueryIdTag") + + private def sanitize(value: String): String = value.replaceAll("[^A-Za-z0-9_\\-]", "_") + + def newQueryId(): String = { + val randomId = sanitize(UUID.randomUUID().toString) + s"Gluten_Query_$randomId" + } + + def getQueryId(plan: SparkPlan): Option[String] = plan.getTagValue(QueryIdTag) + + def setQueryId(plan: SparkPlan, queryId: String): Unit = { + plan.setTagValue(QueryIdTag, queryId) + } + + def assignQueryId(plan: SparkPlan, queryId: String): String = { + plan.foreach(setQueryId(_, queryId)) + queryId + } + + private def findQueryId(plan: SparkPlan): Option[String] = { + getQueryId(plan).orElse(plan.children.iterator.map(findQueryId).find(_.isDefined).flatten) + } + + def initializeQueryId(plan: SparkPlan): String = { + getQueryId(plan).getOrElse(assignQueryId(plan, newQueryId())) + } + + def getRequiredQueryId(plan: SparkPlan): String = { + findQueryId(plan).getOrElse { + throw new IllegalStateException(s"Query ID has not been initialized for ${plan.nodeName}") + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 4a00dbb5872..0483cc5e4a9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -31,11 +31,13 @@ import scala.collection.JavaConverters.asScalaBufferConverter trait BaseGlutenPartition extends Partition with InputPartition { def plan: Array[Byte] + def queryId: String } case class GlutenPartition( index: Int, plan: Array[Byte], + queryId: String, splitInfos: Array[SplitInfo] = Array.empty[SplitInfo], files: Array[String] = Array.empty[String] // touched files, for implementing UDF input_file_name diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 149a0ca729e..3964c12e10e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -105,6 +107,15 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { (right, left) } + protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match { + case b: BroadcastQueryStageExec => + canonicalBuildHashTableId(b.plan) + case r: ReusedExchangeExec => + canonicalBuildHashTableId(r.child) + case other => + other.id.toString + } + def sameType(from: DataType, to: DataType): Boolean = { (from, to) match { case (ArrayType(fromElement, _), ArrayType(toElement, _)) => @@ -267,7 +278,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { inputBuildOutput, context, operatorId, - buildPlan.id.toString + canonicalBuildHashTableId(buildPlan) ) context.registerJoinParam(operatorId, joinParams) @@ -392,7 +403,7 @@ abstract class BroadcastHashJoinExecTransformerBase( override def hashJoinType: JoinType = joinType // Unique ID for builded hash table - lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id + lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) override def genJoinParametersInternal(): (Int, Int, String) = { (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 8de9d407bfd..19f8243624b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -51,6 +51,7 @@ case class TransformContext(outputAttributes: Seq[Attribute], root: RelNode) case class WholeStageTransformContext( root: PlanNode, substraitContext: SubstraitContext = null, + queryId: String, enableCudf: Boolean = false, supportsValueStreamDynamicFilter: Boolean = true) @@ -234,6 +235,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f } private def generateWholeStageTransformContext(): WholeStageTransformContext = { + val queryId = GlutenQueryContext.getRequiredQueryId(this) val substraitContext = new SubstraitContext val childCtx = child .asInstanceOf[TransformSupport] @@ -261,6 +263,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f WholeStageTransformContext( planNode, substraitContext, + queryId, isCudf, !hasNonDeterministicExprInJoinProbe(child)) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala index 393716bb7b1..51251cc2928 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala @@ -50,6 +50,7 @@ class WholeStageZippedPartitionsRDD( inputIterators, sparkConf, resCtx.root, + resCtx.queryId, pipelineTime, updateNativeMetrics, split.index, diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 4b2890d51d1..bf1d7384bb0 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.GlutenQueryContext import org.apache.gluten.execution.ValidatablePlan import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.metrics.GlutenTimeMetric @@ -57,6 +58,7 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) @transient override lazy val relationFuture: java.util.concurrent.Future[broadcast.Broadcast[Any]] = { + val queryId = GlutenQueryContext.getRequiredQueryId(this) SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]]( session, BroadcastExchangeExec.executionContext) { @@ -76,7 +78,8 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) child, longMetric("numOutputRows"), longMetric("dataSize"), - metrics.getOrElse("buildThreads", null)) + metrics.getOrElse("buildThreads", null), + queryId) } val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime")) { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala index c81d380a781..c794841eec2 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala @@ -142,7 +142,8 @@ case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupp case class ColumnarCollapseTransformStages(glutenConf: GlutenConfig) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { - insertWholeStageTransformer(plan) + val queryId = GlutenQueryContext.initializeQueryId(plan) + insertWholeStageTransformer(plan, queryId) } /** @@ -161,25 +162,32 @@ case class ColumnarCollapseTransformStages(glutenConf: GlutenConfig) extends Rul } /** Inserts an InputIteratorTransformer on top of those that do not support transform. */ - private def insertInputIteratorTransformer(plan: SparkPlan): SparkPlan = { + private def insertInputIteratorTransformer(plan: SparkPlan, queryId: String): SparkPlan = { plan match { case p if p.isInstanceOf[WholeStageTransformer] || !supportTransform(p) => // TODO: if p.isInstanceOf[WholeStageTransformer], we can merge two whole stage // transformers. - ColumnarCollapseTransformStages.wrapInputIteratorTransformer(insertWholeStageTransformer(p)) + ColumnarCollapseTransformStages.wrapInputIteratorTransformer( + insertWholeStageTransformer(p, queryId)) case p => - p.withNewChildren(p.children.map(insertInputIteratorTransformer)) + p.withNewChildren(p.children.map(insertInputIteratorTransformer(_, queryId))) } } - private def insertWholeStageTransformer(plan: SparkPlan): SparkPlan = { + private def insertWholeStageTransformer(plan: SparkPlan, queryId: String): SparkPlan = { plan match { - case wst: WholeStageTransformer => wst + case wst: WholeStageTransformer => + GlutenQueryContext.setQueryId(wst, queryId) + wst case t if supportTransform(t) => // transformStageId will be updated by rule `GenerateTransformStageId`. - WholeStageTransformer(t.withNewChildren(t.children.map(insertInputIteratorTransformer)))(-1) + val wholeStage = + WholeStageTransformer( + t.withNewChildren(t.children.map(insertInputIteratorTransformer(_, queryId))))(-1) + GlutenQueryContext.setQueryId(wholeStage, queryId) + wholeStage case other => - other.withNewChildren(other.children.map(insertWholeStageTransformer)) + other.withNewChildren(other.children.map(insertWholeStageTransformer(_, queryId))) } } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6ccddac0c62..6f1272b7f77 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -522,7 +522,9 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", - SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false" + ) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT a FROM testData join testData2 ON key = a " + "where value >= (" +