Skip to content

[VL][TEST]Use Velox's HashTableCache to cache the BHJ's HashTable#12163

Open
JkSelf wants to merge 7 commits into
apache:mainfrom
JkSelf:gluten-hashtable-cache
Open

[VL][TEST]Use Velox's HashTableCache to cache the BHJ's HashTable#12163
JkSelf wants to merge 7 commits into
apache:mainfrom
JkSelf:gluten-hashtable-cache

Conversation

@JkSelf

@JkSelf JkSelf commented May 28, 2026

Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions Bot added CORE works for Gluten Core BUILD VELOX labels May 28, 2026
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI review requested due to automatic review settings June 5, 2026 08:39
@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from 63a6382 to 10aec74 Compare June 5, 2026 08:39
@github-actions

github-actions Bot commented Jun 5, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR wires Velox’s HashTableCache into Gluten’s Velox broadcast hash join (BHJ) path by stabilizing the build-side hash table identifier across AQE wrappers and propagating Spark execution metadata down into the native runtime to support cache scoping/reuse.

Changes:

  • Canonicalize BHJ build hash table IDs across BroadcastQueryStageExec / ReusedExchangeExec so reuse paths share a stable cache key.
  • Add Spark SQL execution id propagation (Java → JNI → native) and use it in Velox task/query identifiers.
  • Integrate Velox HashTableCache injection/drop in native BHJ build/cleanup, and update Velox-side build relation/cache call sites accordingly.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala Adjusts AQE test conf to avoid an optimizer rule impacting reuse scenarios.
gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala Introduces canonical build hash table ID derivation across AQE wrappers and uses it in join parameters.
gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java Extends JNI kernel creation signature to include Spark execution id.
gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java Extracts Spark execution id from task local properties and passes it to JNI.
ep/build-velox/src/get-velox.sh Changes default Velox repo/branch selection for builds.
cpp/velox/substrait/SubstraitToVeloxPlan.cc Updates BHJ plan construction to align with Velox-side hash table caching behavior.
cpp/velox/jni/VeloxJniWrapper.cc Injects built hash tables into Velox HashTableCache and updates clone/clear JNI APIs to use cache keys.
cpp/velox/compute/WholeStageResultIterator.cc Uses Spark execution id for Velox task/query identification.
cpp/core/jni/JniWrapper.cc Propagates execution id into native SparkTaskInfo.
cpp/core/compute/Runtime.h Extends SparkTaskInfo with execution id and updates formatting.
backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala Updates hash table “clone” call to pass cache key.
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala Updates hash table “clone” call to pass cache key.
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala Updates hash table clear to drop by cache key.
backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala Uses canonical build hash table ID for broadcast-table resource tracking and reuse.
backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java Updates native API signatures to include cache key for clone/clear.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 123 to 129
std::unordered_set<velox::core::PlanNodeId> emptySet;
velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
task_ = velox::exec::Task::create(
fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo_.stageId),
std::to_string(taskInfo_.taskId),
std::to_string(taskInfo.vId)),
getVeloxTaskId(taskInfo_),
std::move(planFragment),
0,
Comment thread ep/build-velox/src/get-velox.sh Outdated
Comment on lines +19 to +22
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/IBM/velox.git
VELOX_BRANCH=dft-2026_06_04
VELOX_ENHANCED_BRANCH=ibm-2026_06_04
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_04-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_04-hashtable-cache
@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from 10aec74 to 581f7e5 Compare June 5, 2026 08:56
@github-actions

github-actions Bot commented Jun 5, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from 581f7e5 to 824da36 Compare June 8, 2026 06:41
Copilot AI review requested due to automatic review settings June 8, 2026 06:41
@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Comment thread ep/build-velox/src/get-velox.sh Outdated
Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_05-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache
Comment on lines +76 to +85
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo.stageId),
std::to_string(taskInfo.taskId),
std::to_string(taskInfo.vId));
}
Comment on lines +451 to +455
const auto& hashTableId = sJoin.hashtableid();
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId;
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinNodeId,
Comment on lines 405 to 407
// Unique ID for builded hash table
lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id
lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan)

Comment on lines 108 to 110
// Unique ID for built table
lazy val buildBroadcastTableId: String = buildPlan.id.toString
lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan)

@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from 824da36 to 2f34754 Compare June 9, 2026 03:08
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI review requested due to automatic review settings June 9, 2026 04:28
@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from 2f34754 to 677658b Compare June 9, 2026 04:28
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.

Comment on lines +76 to +81
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format("Gluten_Execution_{}", "");
}
Comment on lines 137 to 141
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
var cacheKey = ""
if (executionId != null) {
cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
Comment thread ep/build-velox/src/get-velox.sh Outdated
Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_05-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI review requested due to automatic review settings June 10, 2026 06:52
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.

Comment on lines +76 to +81
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format("Gluten_Execution_{}", "");
}

size_t pos = hashTableId.find(':');
auto planNodeId = hashTableId.substr(pos + 1);
std::cout << "the cacheKey is " << hashTableId << " the planNode id is " << planNodeId << std::endl;
Comment on lines +459 to +463
size_t pos = hashTableId.find(':');
auto planNodeId = hashTableId.substr(pos + 1);
std::cout << "the cacheKey is " << hashTableId << " the planNode id is " << planNodeId << std::endl;
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : planNodeId;

Comment thread cpp/velox/jni/VeloxJniWrapper.cc Outdated
const auto abandonHashBuildDedupMinPct =
queryConf.get<uint32_t>(kAbandonDedupHashMapMinPct, kAbandonDedupHashMapMinPctDefault);
const auto hashTableId = jStringToCString(env, tableId);
std::cout << "the cacheKey in nativeBuild is " << hashTableId << std::endl;
Comment thread ep/build-velox/src/get-velox.sh Outdated
Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_05-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache
Comment on lines +604 to +605
enableSuite[GlutenTableScanSuite]
.includeByPrefix("Caching")
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

wq
Use Velox's HashTableCache to cache the BHJ's HashTable
@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from 96e888a to 67b5711 Compare June 11, 2026 04:35
Copilot AI review requested due to automatic review settings June 11, 2026 04:35
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_06-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache
Comment on lines +76 to +81
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format("Gluten_Execution_{}", "");
}
context,
operatorId,
buildPlan.id.toString
canonicalBuildHashTableId(buildPlan)
Comment on lines 137 to +141
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
var cacheKey = ""
if (executionId != null) {
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId
GlutenDriverEndpoint.collectResources(executionId, cacheKey)
Comment on lines +451 to +455
const auto& hashTableId = sJoin.hashtableid();
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId;
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinNodeId,
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI review requested due to automatic review settings June 12, 2026 03:53
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.

Comment on lines +37 to +43
def getOrCreateQueryId(sparkContext: SparkContext): String = synchronized {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val existingQueryId = sparkContext.getLocalProperty(QueryIdLocalProperty)
val existingExecutionId = sparkContext.getLocalProperty(QueryExecutionIdLocalProperty)
if (existingQueryId != null && existingExecutionId == executionId) {
return existingQueryId
}
s"Gluten_Query_${sanitize(sparkContext.applicationId)}_$randomId"
}

setQueryId(sparkContext, queryId, executionId)
Comment on lines +59 to +62
def setQueryId(sparkContext: SparkContext, queryId: String, executionId: String): Unit = {
sparkContext.setLocalProperty(QueryIdLocalProperty, queryId)
sparkContext.setLocalProperty(QueryExecutionIdLocalProperty, executionId)
}
Comment on lines +451 to +455
const auto& hashTableId = sJoin.hashtableid();
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId;
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinNodeId,
Comment on lines +1064 to +1069
auto* cache = facebook::velox::exec::HashTableCache::instance();

if (!cache->hasTable(hashTableId)) {
std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n";
cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool());
}
Comment on lines +1188 to +1190
auto cacheKeyStr = jStringToCString(env, cacheKey);
std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n";
facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr);
Comment on lines +137 to +148
private static String getQueryId(TaskContext taskContext) {
final String queryId = taskContext.getLocalProperty(GLUTEN_QUERY_ID_KEY);
if (queryId != null) {
return queryId;
}

final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY);
if (executionId != null) {
return "Gluten_Execution_" + executionId;
}
return "Gluten_Execution_";
}
Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_06-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache
Comment on lines 137 to +141
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val queryId = GlutenQueryContext.getOrCreateQueryId(sparkContext)
val cacheKey = s"$queryId:$buildHashTableId"
if (executionId != null) {
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
GlutenDriverEndpoint.collectResources(executionId, cacheKey)
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI review requested due to automatic review settings June 12, 2026 06:47
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 29 out of 29 changed files in this pull request and generated 12 comments.

Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_06-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache
Comment on lines +1066 to +1069
if (!cache->hasTable(hashTableId)) {
std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n";
cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool());
}
Comment on lines +1151 to +1158
if (!cache->hasTable(hashTableId)) {
std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n";
cache->injectTable(
hashTableId,
hashTableBuilders[0]->hashTable(),
hashTableBuilders[0]->joinHasNullKeys(),
defaultLeafVeloxMemoryPool());
}
Comment on lines +1188 to +1190
auto cacheKeyStr = jStringToCString(env, cacheKey);
std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n";
facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr);
Comment on lines +76 to +84
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (!taskInfo.queryId.empty()) {
return taskInfo.queryId;
}
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format("Gluten_Execution_{}", "");
}
BroadcastExchangeExec.executionContext) {
try {
SparkShimLoader.getSparkShims.setJobDescriptionOrTagForBroadcastExchange(sparkContext, this)
logWarning(
metrics.getOrElse("buildThreads", null),
queryId)
}
logWarning(
sparkContext,
relation.asInstanceOf[Any])
}
logWarning(
offload,
buildThreadsValue)
}
logWarning(
context,
operatorId,
buildPlan.id.toString
canonicalBuildHashTableId(buildPlan)
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI review requested due to automatic review settings June 12, 2026 08:33
@JkSelf JkSelf force-pushed the gluten-hashtable-cache branch from f145c1c to 666e46b Compare June 12, 2026 08:33
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.

Comment on lines +20 to +22
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_06_06-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_06_06-hashtable-cache
Comment on lines +115 to +116
case other =>
other.id.toString
Comment on lines 137 to +141
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
var cacheKey = ""
if (executionId != null) {
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId
GlutenDriverEndpoint.collectResources(executionId, cacheKey)
Comment on lines +76 to +81
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format("Gluten_Execution_{}", "");
}
auto* cache = facebook::velox::exec::HashTableCache::instance();

if (!cache->hasTable(hashTableId)) {
std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n";

auto* cache = facebook::velox::exec::HashTableCache::instance();
if (!cache->hasTable(hashTableId)) {
std::cout << "VeloxJniWrapper inject hash table id is " << hashTableId << "\n";
auto hashTableHandler = ObjectStore::retrieve<gluten::HashTableBuilder>(tableHandler);
hashTableHandler->hashTable()->clear(true);
auto cacheKeyStr = jStringToCString(env, cacheKey);
std::cout << "VeloxJniWrapper clear hash table id is " << cacheKeyStr << "\n";
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants