[VL][TEST]Use Velox's HashTableCache to cache the BHJ's HashTable#12163
[VL][TEST]Use Velox's HashTableCache to cache the BHJ's HashTable#12163JkSelf wants to merge 7 commits into
Conversation
|
Run Gluten Clickhouse CI on x86 |
63a6382 to
10aec74
Compare
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
This PR wires Velox’s HashTableCache into Gluten’s Velox broadcast hash join (BHJ) path by stabilizing the build-side hash table identifier across AQE wrappers and propagating Spark execution metadata down into the native runtime to support cache scoping/reuse.
Changes:
- Canonicalize BHJ build hash table IDs across
BroadcastQueryStageExec/ReusedExchangeExecso reuse paths share a stable cache key. - Add Spark SQL execution id propagation (Java → JNI → native) and use it in Velox task/query identifiers.
- Integrate Velox
HashTableCacheinjection/drop in native BHJ build/cleanup, and update Velox-side build relation/cache call sites accordingly.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | Adjusts AQE test conf to avoid an optimizer rule impacting reuse scenarios. |
| gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala | Introduces canonical build hash table ID derivation across AQE wrappers and uses it in join parameters. |
| gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java | Extends JNI kernel creation signature to include Spark execution id. |
| gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java | Extracts Spark execution id from task local properties and passes it to JNI. |
| ep/build-velox/src/get-velox.sh | Changes default Velox repo/branch selection for builds. |
| cpp/velox/substrait/SubstraitToVeloxPlan.cc | Updates BHJ plan construction to align with Velox-side hash table caching behavior. |
| cpp/velox/jni/VeloxJniWrapper.cc | Injects built hash tables into Velox HashTableCache and updates clone/clear JNI APIs to use cache keys. |
| cpp/velox/compute/WholeStageResultIterator.cc | Uses Spark execution id for Velox task/query identification. |
| cpp/core/jni/JniWrapper.cc | Propagates execution id into native SparkTaskInfo. |
| cpp/core/compute/Runtime.h | Extends SparkTaskInfo with execution id and updates formatting. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala | Updates hash table “clone” call to pass cache key. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala | Updates hash table “clone” call to pass cache key. |
| backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala | Updates hash table clear to drop by cache key. |
| backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala | Uses canonical build hash table ID for broadcast-table resource tracking and reuse. |
| backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java | Updates native API signatures to include cache key for clone/clear. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| std::unordered_set<velox::core::PlanNodeId> emptySet; | ||
| velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; | ||
| std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx(); | ||
| task_ = velox::exec::Task::create( | ||
| fmt::format( | ||
| "Gluten_Stage_{}_TID_{}_VTID_{}", | ||
| std::to_string(taskInfo_.stageId), | ||
| std::to_string(taskInfo_.taskId), | ||
| std::to_string(taskInfo.vId)), | ||
| getVeloxTaskId(taskInfo_), | ||
| std::move(planFragment), | ||
| 0, |
| CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) | ||
| VELOX_REPO=https://github.com/IBM/velox.git | ||
| VELOX_BRANCH=dft-2026_06_04 | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_04 | ||
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_04-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_04-hashtable-cache |
10aec74 to
581f7e5
Compare
|
Run Gluten Clickhouse CI on x86 |
581f7e5 to
824da36
Compare
|
Run Gluten Clickhouse CI on x86 |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_05-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format( | ||
| "Gluten_Stage_{}_TID_{}_VTID_{}", | ||
| std::to_string(taskInfo.stageId), | ||
| std::to_string(taskInfo.taskId), | ||
| std::to_string(taskInfo.vId)); | ||
| } |
| const auto& hashTableId = sJoin.hashtableid(); | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; | ||
| // Create HashJoinNode node | ||
| return std::make_shared<core::HashJoinNode>( | ||
| nextPlanNodeId(), | ||
| joinNodeId, |
| // Unique ID for builded hash table | ||
| lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id | ||
| lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) | ||
|
|
| // Unique ID for built table | ||
| lazy val buildBroadcastTableId: String = buildPlan.id.toString | ||
| lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan) | ||
|
|
824da36 to
2f34754
Compare
|
Run Gluten Clickhouse CI on x86 |
2f34754 to
677658b
Compare
|
Run Gluten Clickhouse CI on x86 |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| var cacheKey = "" | ||
| if (executionId != null) { | ||
| cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId | ||
| GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_05-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
|
|
||
| size_t pos = hashTableId.find(':'); | ||
| auto planNodeId = hashTableId.substr(pos + 1); | ||
| std::cout << "the cacheKey is " << hashTableId << " the planNode id is " << planNodeId << std::endl; |
| size_t pos = hashTableId.find(':'); | ||
| auto planNodeId = hashTableId.substr(pos + 1); | ||
| std::cout << "the cacheKey is " << hashTableId << " the planNode id is " << planNodeId << std::endl; | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : planNodeId; | ||
|
|
| const auto abandonHashBuildDedupMinPct = | ||
| queryConf.get<uint32_t>(kAbandonDedupHashMapMinPct, kAbandonDedupHashMapMinPctDefault); | ||
| const auto hashTableId = jStringToCString(env, tableId); | ||
| std::cout << "the cacheKey in nativeBuild is " << hashTableId << std::endl; |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_05-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache |
| enableSuite[GlutenTableScanSuite] | ||
| .includeByPrefix("Caching") |
|
Run Gluten Clickhouse CI on x86 |
96e888a to
67b5711
Compare
|
Run Gluten Clickhouse CI on x86 |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_06-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
| context, | ||
| operatorId, | ||
| buildPlan.id.toString | ||
| canonicalBuildHashTableId(buildPlan) |
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| var cacheKey = "" | ||
| if (executionId != null) { | ||
| GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) | ||
| cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId | ||
| GlutenDriverEndpoint.collectResources(executionId, cacheKey) |
| const auto& hashTableId = sJoin.hashtableid(); | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; | ||
| // Create HashJoinNode node | ||
| return std::make_shared<core::HashJoinNode>( | ||
| nextPlanNodeId(), | ||
| joinNodeId, |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| def getOrCreateQueryId(sparkContext: SparkContext): String = synchronized { | ||
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| val existingQueryId = sparkContext.getLocalProperty(QueryIdLocalProperty) | ||
| val existingExecutionId = sparkContext.getLocalProperty(QueryExecutionIdLocalProperty) | ||
| if (existingQueryId != null && existingExecutionId == executionId) { | ||
| return existingQueryId | ||
| } |
| s"Gluten_Query_${sanitize(sparkContext.applicationId)}_$randomId" | ||
| } | ||
|
|
||
| setQueryId(sparkContext, queryId, executionId) |
| def setQueryId(sparkContext: SparkContext, queryId: String, executionId: String): Unit = { | ||
| sparkContext.setLocalProperty(QueryIdLocalProperty, queryId) | ||
| sparkContext.setLocalProperty(QueryExecutionIdLocalProperty, executionId) | ||
| } |
| const auto& hashTableId = sJoin.hashtableid(); | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; | ||
| // Create HashJoinNode node | ||
| return std::make_shared<core::HashJoinNode>( | ||
| nextPlanNodeId(), | ||
| joinNodeId, |
| auto* cache = facebook::velox::exec::HashTableCache::instance(); | ||
|
|
||
| if (!cache->hasTable(hashTableId)) { | ||
| std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; | ||
| cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); | ||
| } |
| auto cacheKeyStr = jStringToCString(env, cacheKey); | ||
| std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n"; | ||
| facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); |
| private static String getQueryId(TaskContext taskContext) { | ||
| final String queryId = taskContext.getLocalProperty(GLUTEN_QUERY_ID_KEY); | ||
| if (queryId != null) { | ||
| return queryId; | ||
| } | ||
|
|
||
| final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY); | ||
| if (executionId != null) { | ||
| return "Gluten_Execution_" + executionId; | ||
| } | ||
| return "Gluten_Execution_"; | ||
| } |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_06-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache |
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| val queryId = GlutenQueryContext.getOrCreateQueryId(sparkContext) | ||
| val cacheKey = s"$queryId:$buildHashTableId" | ||
| if (executionId != null) { | ||
| GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) | ||
| GlutenDriverEndpoint.collectResources(executionId, cacheKey) |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_06-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache |
| if (!cache->hasTable(hashTableId)) { | ||
| std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; | ||
| cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); | ||
| } |
| if (!cache->hasTable(hashTableId)) { | ||
| std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; | ||
| cache->injectTable( | ||
| hashTableId, | ||
| hashTableBuilders[0]->hashTable(), | ||
| hashTableBuilders[0]->joinHasNullKeys(), | ||
| defaultLeafVeloxMemoryPool()); | ||
| } |
| auto cacheKeyStr = jStringToCString(env, cacheKey); | ||
| std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n"; | ||
| facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (!taskInfo.queryId.empty()) { | ||
| return taskInfo.queryId; | ||
| } | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
| BroadcastExchangeExec.executionContext) { | ||
| try { | ||
| SparkShimLoader.getSparkShims.setJobDescriptionOrTagForBroadcastExchange(sparkContext, this) | ||
| logWarning( |
| metrics.getOrElse("buildThreads", null), | ||
| queryId) | ||
| } | ||
| logWarning( |
| sparkContext, | ||
| relation.asInstanceOf[Any]) | ||
| } | ||
| logWarning( |
| offload, | ||
| buildThreadsValue) | ||
| } | ||
| logWarning( |
| context, | ||
| operatorId, | ||
| buildPlan.id.toString | ||
| canonicalBuildHashTableId(buildPlan) |
|
Run Gluten Clickhouse CI on x86 |
f145c1c to
666e46b
Compare
|
Run Gluten Clickhouse CI on x86 |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_06-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache |
| case other => | ||
| other.id.toString |
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| var cacheKey = "" | ||
| if (executionId != null) { | ||
| GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) | ||
| cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId | ||
| GlutenDriverEndpoint.collectResources(executionId, cacheKey) |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
| auto* cache = facebook::velox::exec::HashTableCache::instance(); | ||
|
|
||
| if (!cache->hasTable(hashTableId)) { | ||
| std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; |
|
|
||
| auto* cache = facebook::velox::exec::HashTableCache::instance(); | ||
| if (!cache->hasTable(hashTableId)) { | ||
| std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n"; |
| auto hashTableHandler = ObjectStore::retrieve<gluten::HashTableBuilder>(tableHandler); | ||
| hashTableHandler->hashTable()->clear(true); | ||
| auto cacheKeyStr = jStringToCString(env, cacheKey); | ||
| std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n"; |
|
Run Gluten Clickhouse CI on x86 |
What changes are proposed in this pull request?
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?