From eadd35981c4c949cc08046b02827ebd46364adff Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 28 May 2026 07:12:21 +0100 Subject: [PATCH 1/7] wq wq Use Velox's HashTableCache to cache the BHJ's HashTable --- .../gluten/vectorized/HashJoinBuilder.java | 4 +-- .../execution/HashJoinExecTransformer.scala | 2 +- .../VeloxBroadcastBuildSideCache.scala | 2 +- .../execution/ColumnarBuildSideRelation.scala | 2 +- .../UnsafeColumnarBuildSideRelation.scala | 2 +- cpp/velox/jni/VeloxJniWrapper.cc | 31 ++++++++++++++++-- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 32 ++++--------------- ep/build-velox/src/get-velox.sh | 6 ++-- .../execution/JoinExecTransformer.scala | 15 +++++++-- .../velox/VeloxAdaptiveQueryExecSuite.scala | 4 ++- 10 files changed, 60 insertions(+), 40 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index 106d8b98170..28ed5514a9c 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -35,9 +35,9 @@ public long rtHandle() { return runtime.getHandle(); } - public static native void clearHashTable(long hashTableData); + public static native void clearHashTable(String cacheKey, long hashTableData); - public static native long cloneHashTable(long hashTableData); + public static native long cloneHashTable(String cacheKey, long hashTableData); public native long nativeBuild( String buildHashTableId, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 1554c4ddd3e..5ddb9d9a7b8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -106,7 +106,7 @@ case class BroadcastHashJoinExecTransformer( isNullAwareAntiJoin) { // Unique ID for built table - lazy val buildBroadcastTableId: String = buildPlan.id.toString + lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan) override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match { case _: InnerLike => diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 535fd8900e1..c8fcd574cdd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache } } - HashJoinBuilder.clearHashTable(value.pointer) + HashJoinBuilder.clearHashTable(key, value.pointer) } } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index fea9f149745..f640fde9eaa 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -234,7 +234,7 @@ case class ColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index fbc329f3606..de783de4892 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -204,7 +204,7 @@ class UnsafeColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index aa4d9599435..aab855a31f6 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -46,6 +46,7 @@ #include "velox/common/base/BloomFilter.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/HashTable.h" +#include "velox/exec/HashTableCache.h" #ifdef GLUTEN_ENABLE_GPU #include "cudf/CudfPlanValidator.h" @@ -1060,6 +1061,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native nullptr); builder->setHashTable(std::move(mainTable)); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + + if (!cache->hasTable(hashTableId)) { + cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(builder); } @@ -1138,6 +1145,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native } hashTableBuilders[0]->setHashTable(std::move(mainTable)); + + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(hashTableId)) { + cache->injectTable( + hashTableId, + hashTableBuilders[0]->hashTable(), + hashTableBuilders[0]->joinHasNullKeys(), + defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableBuilders[0]); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1145,9 +1162,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START + auto cacheKeyStr = jStringToCString(env, cacheKey); auto hashTableHandler = ObjectStore::retrieve(tableHandler); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(cacheKeyStr)) { + cache->injectTable( + cacheKeyStr, hashTableHandler->hashTable(), hashTableHandler->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableHandler); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1155,13 +1180,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneH JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START - auto hashTableHandler = ObjectStore::retrieve(tableHandler); - hashTableHandler->hashTable()->clear(true); + auto cacheKeyStr = jStringToCString(env, cacheKey); + facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); ObjectStore::release(tableHandler); JNI_METHOD_END() } + #ifdef __cplusplus } #endif diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index b0fc0fc4a30..246b7814660 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -448,29 +448,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } else if ( sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { - std::string hashTableId = sJoin.hashtableid(); - - std::shared_ptr opaqueSharedHashTable = nullptr; - bool joinHasNullKeys = false; - - try { - auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); - joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); - auto originalShared = hashTableBuilder->hashTable(); - opaqueSharedHashTable = std::shared_ptr( - originalShared, reinterpret_cast(originalShared.get())); - - LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; - } catch (const std::exception& e) { - LOG(WARNING) - << "Error retrieving HashTable from ObjectStore: " << e.what() - << ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false."; - opaqueSharedHashTable = nullptr; - } - + const auto& hashTableId = sJoin.hashtableid(); + const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; // Create HashJoinNode node return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, @@ -479,14 +461,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: leftNode, rightNode, getJoinOutputType(leftNode, rightNode, joinType), - false, - false, - joinHasNullKeys, - opaqueSharedHashTable); + true); } else { // Create HashJoinNode node + auto joinNodeId = nextPlanNodeId(); return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 71965f6c737..39ba1d9db61 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,9 +17,9 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_06_06 -VELOX_ENHANCED_BRANCH=ibm-2026_06_06 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=dft-2026_06_06-hashtable-cache +VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache VELOX_HOME="" RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 149a0ca729e..3964c12e10e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -105,6 +107,15 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { (right, left) } + protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match { + case b: BroadcastQueryStageExec => + canonicalBuildHashTableId(b.plan) + case r: ReusedExchangeExec => + canonicalBuildHashTableId(r.child) + case other => + other.id.toString + } + def sameType(from: DataType, to: DataType): Boolean = { (from, to) match { case (ArrayType(fromElement, _), ArrayType(toElement, _)) => @@ -267,7 +278,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { inputBuildOutput, context, operatorId, - buildPlan.id.toString + canonicalBuildHashTableId(buildPlan) ) context.registerJoinParam(operatorId, joinParams) @@ -392,7 +403,7 @@ abstract class BroadcastHashJoinExecTransformerBase( override def hashJoinType: JoinType = joinType // Unique ID for builded hash table - lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id + lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) override def genJoinParametersInternal(): (Int, Int, String) = { (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6ccddac0c62..6f1272b7f77 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -522,7 +522,9 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", - SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false" + ) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT a FROM testData join testData2 ON key = a " + "where value >= (" + From d59773b02c4cbd9b1dc16bc5af88d2b4e87fcb04 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 5 Jun 2026 06:52:03 +0100 Subject: [PATCH 2/7] Use spark.execution.id as the Velox's queryId --- cpp/core/compute/Runtime.h | 6 +++-- cpp/core/jni/JniWrapper.cc | 3 ++- cpp/velox/compute/WholeStageResultIterator.cc | 25 +++++++++++-------- .../vectorized/NativePlanEvaluator.java | 22 ++++++++++++++-- .../vectorized/PlanEvaluatorJniWrapper.java | 1 + 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 4ab944898bd..fff50a4df1d 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -42,12 +42,14 @@ struct SparkTaskInfo { int32_t partitionId{0}; // Same as TID. int64_t taskId{0}; + // Same as Spark SQL execution id. -1 means unavailable. + int64_t executionId{-1}; // virtual id for each backend internal use int32_t vId{0}; std::string toString() const { - return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + - "]"; + return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) + + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]"; } friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) { diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 827c5ad8bdd..fd23bc55a1a 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -468,13 +468,14 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jint stageId, jint partitionId, jlong taskId, + jlong executionId, jboolean enableDumping, jstring spillDir) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); - ctx->setSparkTaskInfo({stageId, partitionId, taskId}); + ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId}); if (enableDumping) { ctx->enableDumping(); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c3ac095cdc7..21e27d54c46 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -72,6 +72,19 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; } // namespace +namespace { +std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { + if (taskInfo.executionId != -1) { + return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); + } + return fmt::format( + "Gluten_Stage_{}_TID_{}_VTID_{}", + std::to_string(taskInfo.stageId), + std::to_string(taskInfo.taskId), + std::to_string(taskInfo.vId)); +} +} // namespace + WholeStageResultIterator::WholeStageResultIterator( VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, @@ -111,11 +124,7 @@ WholeStageResultIterator::WholeStageResultIterator( velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; std::shared_ptr queryCtx = createNewVeloxQueryCtx(); task_ = velox::exec::Task::create( - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo.vId)), + getVeloxTaskId(taskInfo_), std::move(planFragment), 0, std::move(queryCtx), @@ -233,11 +242,7 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), spillExecutor_, - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo_.vId))); + getVeloxTaskId(taskInfo_)); return ctx; } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 6d2c90896b2..8ddbb4b3d39 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -34,6 +34,8 @@ public class NativePlanEvaluator { private static final Logger LOGGER = LoggerFactory.getLogger(NativePlanEvaluator.class); private static final AtomicInteger id = new AtomicInteger(0); + private static final long INVALID_EXECUTION_ID = -1L; + private static final String SPARK_EXECUTION_ID_KEY = "spark.sql.execution.id"; private final Runtime runtime; private final PlanEvaluatorJniWrapper jniWrapper; @@ -78,14 +80,16 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( int partitionIndex, String spillDirPath) throws RuntimeException { + final TaskContext taskContext = TaskContext.get(); final long itrHandle = jniWrapper.nativeCreateKernelWithIterator( wsPlan, splitInfo, iterList, - TaskContext.get().stageId(), + taskContext.stageId(), partitionIndex, // TaskContext.getPartitionId(), - TaskContext.get().taskAttemptId(), + taskContext.taskAttemptId(), + getExecutionId(taskContext), DebugUtil.isDumpingEnabledForTask(), spillDirPath); final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle); @@ -113,4 +117,18 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) { return new ColumnarBatchOutIterator(runtime, itrHandle); } + + private static long getExecutionId(TaskContext taskContext) { + final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY); + if (executionId == null) { + return INVALID_EXECUTION_ID; + } + try { + return Long.parseLong(executionId); + } catch (NumberFormatException e) { + LOGGER.warn( + "Invalid Spark execution id '{}', fallback to {}", executionId, INVALID_EXECUTION_ID); + return INVALID_EXECUTION_ID; + } + } } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index a8082906798..f0c3d804bfe 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -76,6 +76,7 @@ public native long nativeCreateKernelWithIterator( int stageId, int partitionId, long taskId, + long executionId, boolean enableDumping, String spillDir) throws RuntimeException; From 69b39ee90ab9f96d816210afcb88caad401b42e1 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 9 Jun 2026 05:04:15 +0100 Subject: [PATCH 3/7] fix --- .../apache/gluten/execution/HashJoinExecTransformer.scala | 5 ++++- cpp/velox/compute/WholeStageResultIterator.cc | 6 +----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 5ddb9d9a7b8..fd0865b004b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -135,9 +135,12 @@ case class BroadcastHashJoinExecTransformer( override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = { val streamedRDD = getColumnarInputRDDs(streamedPlan) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + var cacheKey = "" if (executionId != null) { + cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) } else { + cacheKey = "Gluten_Execution_" + ":" + buildHashTableId logWarning( s"Can not trace broadcast table data $buildBroadcastTableId" + s" because execution id is null." + @@ -174,7 +177,7 @@ case class BroadcastHashJoinExecTransformer( buildPlan.output, filterBuildColumns, filterPropagatesNulls, - buildBroadcastTableId, + cacheKey, isNullAwareAntiJoin, bloomFilterPushdownSize, metrics.get("buildHashTableTime") diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 21e27d54c46..8533a4ab609 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -77,11 +77,7 @@ std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { if (taskInfo.executionId != -1) { return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); } - return fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo.stageId), - std::to_string(taskInfo.taskId), - std::to_string(taskInfo.vId)); + return fmt::format("Gluten_Execution_{}", ""); } } // namespace From cecc144e423ab54240ccbfeba00d5fc8da350514 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 9 Jun 2026 07:39:25 +0100 Subject: [PATCH 4/7] fix --- .../org/apache/gluten/execution/HashJoinExecTransformer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index fd0865b004b..2015328bcfb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -138,7 +138,7 @@ case class BroadcastHashJoinExecTransformer( var cacheKey = "" if (executionId != null) { cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId - GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) + GlutenDriverEndpoint.collectResources(executionId, cacheKey) } else { cacheKey = "Gluten_Execution_" + ":" + buildHashTableId logWarning( From 67b57119f3dea833ec9c6ba7482467e5be01567f Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 11 Jun 2026 05:34:41 +0100 Subject: [PATCH 5/7] fix --- cpp/velox/compute/WholeStageResultIterator.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 8533a4ab609..d97e6822d7b 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -120,7 +120,11 @@ WholeStageResultIterator::WholeStageResultIterator( velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; std::shared_ptr queryCtx = createNewVeloxQueryCtx(); task_ = velox::exec::Task::create( - getVeloxTaskId(taskInfo_), + fmt::format( + "Gluten_Stage_{}_TID_{}_VTID_{}", + std::to_string(taskInfo_.stageId), + std::to_string(taskInfo_.taskId), + std::to_string(taskInfo_.vId)), std::move(planFragment), 0, std::move(queryCtx), From 666e46bf233797e8bcab502bd1e78269c9b7bf5b Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 11 Jun 2026 09:12:12 +0100 Subject: [PATCH 6/7] log --- cpp/velox/jni/VeloxJniWrapper.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index aab855a31f6..45b4660f717 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -1064,6 +1064,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native auto* cache = facebook::velox::exec::HashTableCache::instance(); if (!cache->hasTable(hashTableId)) { + std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); } @@ -1148,6 +1149,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native auto* cache = facebook::velox::exec::HashTableCache::instance(); if (!cache->hasTable(hashTableId)) { + std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; cache->injectTable( hashTableId, hashTableBuilders[0]->hashTable(), @@ -1184,6 +1186,7 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHa jlong tableHandler) { JNI_METHOD_START auto cacheKeyStr = jStringToCString(env, cacheKey); + std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n"; facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); ObjectStore::release(tableHandler); JNI_METHOD_END() From d2cb709c49633ff15e21286f037004395db25178 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 12 Jun 2026 10:01:11 +0100 Subject: [PATCH 7/7] fix --- .../clickhouse/CHIteratorApi.scala | 2 + .../clickhouse/CHSparkPlanExecApi.scala | 3 +- .../stats/GlutenDeltaJobStatsTracker.scala | 3 +- .../stats/GlutenDeltaJobStatsTracker.scala | 3 +- .../gluten/vectorized/HashJoinBuilder.java | 3 +- .../backendsapi/velox/VeloxIteratorApi.scala | 8 ++- .../velox/VeloxSparkPlanExecApi.scala | 9 ++- .../execution/HashJoinExecTransformer.scala | 21 +++++-- .../execution/ColumnarBuildSideRelation.scala | 12 ++-- .../UnsafeColumnarBuildSideRelation.scala | 20 +++++-- cpp/core/compute/Runtime.h | 1 + cpp/core/jni/JniWrapper.cc | 3 +- cpp/velox/compute/WholeStageResultIterator.cc | 3 + cpp/velox/jni/VeloxJniWrapper.cc | 4 +- .../vectorized/NativePlanEvaluator.java | 4 +- .../vectorized/PlanEvaluatorJniWrapper.java | 1 + .../gluten/backendsapi/IteratorApi.scala | 1 + .../gluten/backendsapi/SparkPlanExecApi.scala | 3 +- .../gluten/execution/GlutenQueryContext.scala | 58 +++++++++++++++++++ .../GlutenWholeStageColumnarRDD.scala | 2 + .../execution/WholeStageTransformer.scala | 3 + .../WholeStageZippedPartitionsRDD.scala | 1 + .../ColumnarBroadcastExchangeExec.scala | 5 +- .../ColumnarCollapseTransformStages.scala | 24 +++++--- 24 files changed, 160 insertions(+), 37 deletions(-) create mode 100644 gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index f9820178c25..8dac2a0477e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -259,6 +259,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { GlutenPartition( index, planByteArray, + wsCtx.queryId, splitInfos.toArray ) } @@ -315,6 +316,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { inputIterators: Seq[Iterator[ColumnarBatch]], sparkConf: SparkConf, rootNode: PlanNode, + queryId: String, pipelineTime: SQLMetric, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 832932f0a49..5c3dbd356e6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -481,7 +481,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { child: SparkPlan, numOutputRows: SQLMetric, dataSize: SQLMetric, - buildThreads: SQLMetric): BuildSideRelation = { + buildThreads: SQLMetric, + queryId: String): BuildSideRelation = { val (buildKeys, isNullAware) = mode match { case mode1: HashedRelationBroadcastMode => diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index 2ea7b9e554d..33180b0d0d9 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -194,7 +194,8 @@ object GlutenDeltaJobStatsTracker extends Logging { null, null, 0, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + s"Gluten_Delta_Stats_${UUID.randomUUID()}" ) nativeOutItr } diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index ca6c7a6a7fb..2016a5a6cc6 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -198,7 +198,8 @@ object GlutenDeltaJobStatsTracker extends Logging { null, null, 0, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + s"Gluten_Delta_Stats_${UUID.randomUUID()}" ) nativeOutItr } diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index 28ed5514a9c..6b9dbc5dbf9 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -51,5 +51,6 @@ public native long nativeBuild( byte[] namedStruct, boolean isNullAwareAntiJoin, long bloomFilterPushdownSize, - int broadcastHashTableBuildThreads); + int broadcastHashTableBuildThreads, + String queryId); } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index d8b23b358fa..6e6264c567d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -130,6 +130,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { GlutenPartition( index, planByteArray, + wsCtx.queryId, splitInfos.toArray ) } @@ -216,7 +217,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null, if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null, partitionIndex, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + inputPartition.queryId ) resIter.noMoreSplits() val itrMetrics = IteratorMetricsJniWrapper.create() @@ -243,6 +245,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { inputIterators: Seq[Iterator[ColumnarBatch]], sparkConf: SparkConf, rootNode: PlanNode, + queryId: String, pipelineTime: SQLMetric, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, @@ -270,7 +273,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { null, if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null, partitionIndex, - BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath), + queryId ) nativeResultIterator.noMoreSplits() val itrMetrics = IteratorMetricsJniWrapper.create() diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index be21337d994..6b4308124f3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -715,7 +715,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { child: SparkPlan, numOutputRows: SQLMetric, dataSize: SQLMetric, - buildThreads: SQLMetric): BuildSideRelation = { + buildThreads: SQLMetric, + queryId: String): BuildSideRelation = { val buildKeys = mode match { case mode1: HashedRelationBroadcastMode => @@ -875,7 +876,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { mode, newBuildKeys, offload, - buildThreadsValue) + buildThreadsValue, + queryId) } } else { ColumnarBuildSideRelation( @@ -884,7 +886,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { mode, newBuildKeys, offload, - buildThreadsValue) + buildThreadsValue, + queryId) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 2015328bcfb..f5c8fd8bb61 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -26,8 +26,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.ColumnarBuildSideRelation import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import org.apache.spark.sql.vectorized.ColumnarBatch import io.substrait.proto.JoinRel @@ -134,20 +136,18 @@ case class BroadcastHashJoinExecTransformer( override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = { val streamedRDD = getColumnarInputRDDs(streamedPlan) + val broadcast = buildPlan.executeBroadcast[BuildSideRelation]() + val queryId = extractQueryId(broadcast.value) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - var cacheKey = "" + val cacheKey = s"$queryId:$buildHashTableId" if (executionId != null) { - cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId GlutenDriverEndpoint.collectResources(executionId, cacheKey) } else { - cacheKey = "Gluten_Execution_" + ":" + buildHashTableId logWarning( - s"Can not trace broadcast table data $buildBroadcastTableId" + + s"Can not trace broadcast table data $cacheKey" + s" because execution id is null." + s" Will clean up until expire time.") } - - val broadcast = buildPlan.executeBroadcast[BuildSideRelation]() val bloomFilterPushdownSize = if (VeloxConfig.get.hashProbeDynamicFilterPushdownEnabled) { VeloxConfig.get.hashProbeBloomFilterPushdownMaxSize } else { @@ -186,6 +186,15 @@ case class BroadcastHashJoinExecTransformer( // FIXME: Do we have to make build side a RDD? streamedRDD :+ broadcastRDD } + + private def extractQueryId(relation: BuildSideRelation): String = relation match { + case columnar: ColumnarBuildSideRelation if columnar.queryId.nonEmpty => + columnar.queryId + case unsafe: UnsafeColumnarBuildSideRelation if unsafe.queryIdValue.nonEmpty => + unsafe.queryIdValue + case _ => + GlutenQueryContext.getRequiredQueryId(buildPlan) + } } case class BroadcastHashJoinContext( diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index f640fde9eaa..de84bd97f49 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -52,7 +52,8 @@ object ColumnarBuildSideRelation { mode: BroadcastMode, newBuildKeys: Seq[Expression] = Seq.empty, offload: Boolean = false, - buildThreads: Int = 1): ColumnarBuildSideRelation = { + buildThreads: Int = 1, + queryId: String = ""): ColumnarBuildSideRelation = { val boundMode = mode match { case HashedRelationBroadcastMode(keys, isNullAware) => // Bind each key to the build-side output so simple cols become BoundReference @@ -68,7 +69,8 @@ object ColumnarBuildSideRelation { BroadcastModeUtils.toSafe(boundMode), newBuildKeys, offload, - buildThreads) + buildThreads, + queryId) } } @@ -78,7 +80,8 @@ case class ColumnarBuildSideRelation( safeBroadcastMode: SafeBroadcastMode, newBuildKeys: Seq[Expression], offload: Boolean, - buildThreads: Int) + buildThreads: Int, + queryId: String) extends BuildSideRelation with Logging with KnownSizeEstimation { @@ -223,7 +226,8 @@ case class ColumnarBuildSideRelation( SubstraitUtil.toNameStruct(newOutput).toByteArray, broadcastContext.isNullAwareAntiJoin, broadcastContext.bloomFilterPushdownSize, - buildThreads + buildThreads, + queryId ) jniWrapper.close(serializeHandle) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index de783de4892..39259fd52f3 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -57,7 +57,8 @@ object UnsafeColumnarBuildSideRelation { mode: BroadcastMode, newBuildKeys: Seq[Expression] = Seq.empty, offload: Boolean = false, - buildThreads: Int = 1): UnsafeColumnarBuildSideRelation = { + buildThreads: Int = 1, + queryId: String = ""): UnsafeColumnarBuildSideRelation = { val boundMode = mode match { case HashedRelationBroadcastMode(keys, isNullAware) => // Bind each key to the build-side output so simple cols become BoundReference @@ -73,7 +74,8 @@ object UnsafeColumnarBuildSideRelation { BroadcastModeUtils.toSafe(boundMode), newBuildKeys, offload, - buildThreads) + buildThreads, + queryId) } } @@ -94,7 +96,8 @@ class UnsafeColumnarBuildSideRelation( private var safeBroadcastMode: SafeBroadcastMode, private var newBuildKeys: Seq[Expression], private var offload: Boolean, - private var buildThreads: Int) + private var buildThreads: Int, + private var queryId: String) extends BuildSideRelation with Externalizable with Logging @@ -114,9 +117,11 @@ class UnsafeColumnarBuildSideRelation( def isOffload: Boolean = offload + def queryIdValue: String = queryId + /** needed for serialization. */ def this() = { - this(null, null, null, Seq.empty, false, 1) + this(null, null, null, Seq.empty, false, 1, "") } private[unsafe] def getBatches(): Seq[UnsafeByteArray] = { @@ -193,7 +198,8 @@ class UnsafeColumnarBuildSideRelation( SubstraitUtil.toNameStruct(newOutput).toByteArray, broadcastContext.isNullAwareAntiJoin, broadcastContext.bloomFilterPushdownSize, - buildThreads + buildThreads, + queryId ) jniWrapper.close(serializeHandle) @@ -219,6 +225,7 @@ class UnsafeColumnarBuildSideRelation( out.writeObject(newBuildKeys) out.writeBoolean(offload) out.writeInt(buildThreads) + out.writeUTF(queryId) } override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException { @@ -228,6 +235,7 @@ class UnsafeColumnarBuildSideRelation( kryo.writeClassAndObject(out, newBuildKeys) out.writeBoolean(offload) out.writeInt(buildThreads) + out.writeString(queryId) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -237,6 +245,7 @@ class UnsafeColumnarBuildSideRelation( newBuildKeys = in.readObject().asInstanceOf[Seq[Expression]] offload = in.readBoolean() buildThreads = in.readInt() + queryId = in.readUTF() } override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { @@ -246,6 +255,7 @@ class UnsafeColumnarBuildSideRelation( newBuildKeys = kryo.readClassAndObject(in).asInstanceOf[Seq[Expression]] offload = in.readBoolean() buildThreads = in.readInt() + queryId = in.readString() } private def transformProjection: UnsafeProjection = safeBroadcastMode match { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index fff50a4df1d..cf5f40a1863 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -44,6 +44,7 @@ struct SparkTaskInfo { int64_t taskId{0}; // Same as Spark SQL execution id. -1 means unavailable. int64_t executionId{-1}; + std::string queryId{}; // virtual id for each backend internal use int32_t vId{0}; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index fd23bc55a1a..c0db58d4a49 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -469,13 +469,14 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jint partitionId, jlong taskId, jlong executionId, + jstring queryId, jboolean enableDumping, jstring spillDir) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); - ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId}); + ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId, jStringToCString(env, queryId)}); if (enableDumping) { ctx->enableDumping(); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index d97e6822d7b..6a8de9eb72c 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -74,6 +74,9 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; namespace { std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { + if (!taskInfo.queryId.empty()) { + return taskInfo.queryId; + } if (taskInfo.executionId != -1) { return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); } diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 45b4660f717..1d8c5b1c536 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -972,11 +972,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native jbyteArray namedStruct, jboolean isNullAwareAntiJoin, jlong bloomFilterPushdownSize, - jint numThreads) { + jint numThreads, + jstring queryId) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto* runtime = dynamic_cast(ctx); GLUTEN_CHECK(runtime != nullptr, "Not a Velox runtime"); + ctx->setSparkTaskInfo({0, 0, 0, -1, jStringToCString(env, queryId)}); const auto& queryConf = *(runtime->veloxCfg()); const auto minTableRowsForParallelJoinBuild = queryConf.get(kMinTableRowsForParallelJoinBuild, kMinTableRowsForParallelJoinBuildDefault); diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 8ddbb4b3d39..0d14d059903 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -78,7 +78,8 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( byte[][] splitInfo, ColumnarBatchInIterator[] iterList, int partitionIndex, - String spillDirPath) + String spillDirPath, + String queryId) throws RuntimeException { final TaskContext taskContext = TaskContext.get(); final long itrHandle = @@ -90,6 +91,7 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( partitionIndex, // TaskContext.getPartitionId(), taskContext.taskAttemptId(), getExecutionId(taskContext), + queryId, DebugUtil.isDumpingEnabledForTask(), spillDirPath); final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle); diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index f0c3d804bfe..24b43e9329c 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -77,6 +77,7 @@ public native long nativeCreateKernelWithIterator( int partitionId, long taskId, long executionId, + String queryId, boolean enableDumping, String spillDir) throws RuntimeException; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index a6b00535935..5b2572c2623 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -81,6 +81,7 @@ trait IteratorApi { inputIterators: Seq[Iterator[ColumnarBatch]], sparkConf: SparkConf, rootNode: PlanNode, + queryId: String, pipelineTime: SQLMetric, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 79f3d67c0ec..ceb658693f0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -437,7 +437,8 @@ trait SparkPlanExecApi { child: SparkPlan, numOutputRows: SQLMetric, dataSize: SQLMetric, - buildThreads: SQLMetric = null): BuildSideRelation + buildThreads: SQLMetric = null, + queryId: String = ""): BuildSideRelation def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = { mode.canonicalized diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala new file mode 100644 index 00000000000..316030f4159 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenQueryContext.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.SparkPlan + +import java.util.UUID + +object GlutenQueryContext { + val QueryIdTag: TreeNodeTag[String] = TreeNodeTag[String]("org.apache.gluten.QueryIdTag") + + private def sanitize(value: String): String = value.replaceAll("[^A-Za-z0-9_\\-]", "_") + + def newQueryId(): String = { + val randomId = sanitize(UUID.randomUUID().toString) + s"Gluten_Query_$randomId" + } + + def getQueryId(plan: SparkPlan): Option[String] = plan.getTagValue(QueryIdTag) + + def setQueryId(plan: SparkPlan, queryId: String): Unit = { + plan.setTagValue(QueryIdTag, queryId) + } + + def assignQueryId(plan: SparkPlan, queryId: String): String = { + plan.foreach(setQueryId(_, queryId)) + queryId + } + + private def findQueryId(plan: SparkPlan): Option[String] = { + getQueryId(plan).orElse(plan.children.iterator.map(findQueryId).find(_.isDefined).flatten) + } + + def initializeQueryId(plan: SparkPlan): String = { + getQueryId(plan).getOrElse(assignQueryId(plan, newQueryId())) + } + + def getRequiredQueryId(plan: SparkPlan): String = { + findQueryId(plan).getOrElse { + throw new IllegalStateException(s"Query ID has not been initialized for ${plan.nodeName}") + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 4a00dbb5872..0483cc5e4a9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -31,11 +31,13 @@ import scala.collection.JavaConverters.asScalaBufferConverter trait BaseGlutenPartition extends Partition with InputPartition { def plan: Array[Byte] + def queryId: String } case class GlutenPartition( index: Int, plan: Array[Byte], + queryId: String, splitInfos: Array[SplitInfo] = Array.empty[SplitInfo], files: Array[String] = Array.empty[String] // touched files, for implementing UDF input_file_name diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 8de9d407bfd..19f8243624b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -51,6 +51,7 @@ case class TransformContext(outputAttributes: Seq[Attribute], root: RelNode) case class WholeStageTransformContext( root: PlanNode, substraitContext: SubstraitContext = null, + queryId: String, enableCudf: Boolean = false, supportsValueStreamDynamicFilter: Boolean = true) @@ -234,6 +235,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f } private def generateWholeStageTransformContext(): WholeStageTransformContext = { + val queryId = GlutenQueryContext.getRequiredQueryId(this) val substraitContext = new SubstraitContext val childCtx = child .asInstanceOf[TransformSupport] @@ -261,6 +263,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f WholeStageTransformContext( planNode, substraitContext, + queryId, isCudf, !hasNonDeterministicExprInJoinProbe(child)) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala index 393716bb7b1..51251cc2928 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala @@ -50,6 +50,7 @@ class WholeStageZippedPartitionsRDD( inputIterators, sparkConf, resCtx.root, + resCtx.queryId, pipelineTime, updateNativeMetrics, split.index, diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 4b2890d51d1..bf1d7384bb0 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.GlutenQueryContext import org.apache.gluten.execution.ValidatablePlan import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.metrics.GlutenTimeMetric @@ -57,6 +58,7 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) @transient override lazy val relationFuture: java.util.concurrent.Future[broadcast.Broadcast[Any]] = { + val queryId = GlutenQueryContext.getRequiredQueryId(this) SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]]( session, BroadcastExchangeExec.executionContext) { @@ -76,7 +78,8 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) child, longMetric("numOutputRows"), longMetric("dataSize"), - metrics.getOrElse("buildThreads", null)) + metrics.getOrElse("buildThreads", null), + queryId) } val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime")) { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala index c81d380a781..c794841eec2 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala @@ -142,7 +142,8 @@ case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupp case class ColumnarCollapseTransformStages(glutenConf: GlutenConfig) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { - insertWholeStageTransformer(plan) + val queryId = GlutenQueryContext.initializeQueryId(plan) + insertWholeStageTransformer(plan, queryId) } /** @@ -161,25 +162,32 @@ case class ColumnarCollapseTransformStages(glutenConf: GlutenConfig) extends Rul } /** Inserts an InputIteratorTransformer on top of those that do not support transform. */ - private def insertInputIteratorTransformer(plan: SparkPlan): SparkPlan = { + private def insertInputIteratorTransformer(plan: SparkPlan, queryId: String): SparkPlan = { plan match { case p if p.isInstanceOf[WholeStageTransformer] || !supportTransform(p) => // TODO: if p.isInstanceOf[WholeStageTransformer], we can merge two whole stage // transformers. - ColumnarCollapseTransformStages.wrapInputIteratorTransformer(insertWholeStageTransformer(p)) + ColumnarCollapseTransformStages.wrapInputIteratorTransformer( + insertWholeStageTransformer(p, queryId)) case p => - p.withNewChildren(p.children.map(insertInputIteratorTransformer)) + p.withNewChildren(p.children.map(insertInputIteratorTransformer(_, queryId))) } } - private def insertWholeStageTransformer(plan: SparkPlan): SparkPlan = { + private def insertWholeStageTransformer(plan: SparkPlan, queryId: String): SparkPlan = { plan match { - case wst: WholeStageTransformer => wst + case wst: WholeStageTransformer => + GlutenQueryContext.setQueryId(wst, queryId) + wst case t if supportTransform(t) => // transformStageId will be updated by rule `GenerateTransformStageId`. - WholeStageTransformer(t.withNewChildren(t.children.map(insertInputIteratorTransformer)))(-1) + val wholeStage = + WholeStageTransformer( + t.withNewChildren(t.children.map(insertInputIteratorTransformer(_, queryId))))(-1) + GlutenQueryContext.setQueryId(wholeStage, queryId) + wholeStage case other => - other.withNewChildren(other.children.map(insertWholeStageTransformer)) + other.withNewChildren(other.children.map(insertWholeStageTransformer(_, queryId))) } } }