Skip to content

[GLUTEN-12251][VL] Add config switch to merge broadcast batches for BHJ performance#12259

Open
Xtpacz wants to merge 2 commits into
apache:mainfrom
Xtpacz:fix-bhj-hashbuild-slow
Open

[GLUTEN-12251][VL] Add config switch to merge broadcast batches for BHJ performance#12259
Xtpacz wants to merge 2 commits into
apache:mainfrom
Xtpacz:fix-bhj-hashbuild-slow

Conversation

@Xtpacz

@Xtpacz Xtpacz commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Related issue: #12251

What changes are proposed in this pull request?

This PR adds a new config spark.gluten.velox.broadcastBuild.mergeBatches(default false) that controls how columnar batches are serialized during broadcast build for BHJ.

When enabled, all batches on the build side are serialized into a single buffer through a new serializeAll JNI entry point, so the executor-side HashBuild operator receives one addInput call instead of N. For broadcast tables that fan out into many small batches (e.g. when spark.gluten.sql.columnar.maxBatchSize is small or the build side is narrow), this materially reduces per-batch overhead.

How was this patch tested?

Verified on an internal Spark cluster running TPC-DS 5TB. On q64, with all other configs equal, the aggregated HashBuild total time dropped from 2.04h to 22.1min when mergeBatches=true (~5.5x reduction). Result correctness verified by comparing query output between mergeBatches=true and =false.

Was this patch authored or co-authored using generative AI tooling?

Co-authored using Claude (claude-opus-4.7)

@github-actions github-actions Bot added the VELOX label Jun 8, 2026
@github-actions github-actions Bot added the DOCS label Jun 8, 2026
@Xtpacz

Xtpacz commented Jun 8, 2026

Copy link
Copy Markdown
Contributor Author

@zhouyuan @philo-he could you have a look at the PR?

@zhouyuan

zhouyuan commented Jun 8, 2026

Copy link
Copy Markdown
Member

Cc: @JkSelf @wForget

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Velox-specific configuration switch to optionally merge broadcast build-side columnar batches into a single serialized payload, aiming to reduce per-batch overhead in BroadcastHashJoin (BHJ) HashBuild (fewer addInput calls), with JNI support and validation coverage.

Changes:

  • Added a new Velox config spark.gluten.velox.broadcastBuild.mergeBatches (default false) and wired it into broadcast build-side serialization.
  • Introduced a new JNI entry point serializeAll(long[]) to serialize multiple native ColumnarBatch handles into a single JniUnsafeByteBuffer.
  • Added a Velox hash join test to compare results between merged vs per-batch broadcast build serialization, and documented the new config.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java Exposes new JNI method serializeAll(long[]) for multi-batch serialization.
cpp/core/jni/JniWrapper.cc Implements JNI serializeAll by appending multiple batches to the serializer and emitting one buffer.
backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala Adds merge/per-batch switch in serializeStream for broadcast build-side serialization.
backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala Registers the new mergeBatches config and accessor.
backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala Adds coverage to ensure merged and per-batch paths produce equivalent results and still use BHJ.
docs/velox-configuration.md Documents the new spark.gluten.velox.broadcastBuild.mergeBatches option.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

| spark.gluten.sql.enable.enhancedFeatures | 🔄 Dynamic | true | Enable some features including iceberg native write and other features. |
| spark.gluten.sql.rewrite.castArrayToString | 🔄 Dynamic | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. |
| spark.gluten.velox.broadcast.build.targetBytesPerThread | ⚓ Static | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. |
| spark.gluten.velox.broadcastBuild.mergeBatches | 🔄 Dynamic | false | If enabled, all columnar batches in a broadcast build relation will be serialized into a single buffer to reduce the number of addInput calls in HashBuild operator. This can significantly improve BHJ performance when the broadcast table has many small batches. |
Comment on lines +625 to +629
.doc(
"If enabled, all columnar batches in a broadcast build relation will be " +
"serialized into a single buffer to reduce the number of addInput calls in " +
"HashBuild operator. This can significantly improve BHJ performance when " +
"the broadcast table has many small batches.")
Comment on lines +186 to +192
val merged = jniWrapper.serializeAll(handles.toArray)
val useOffheapBroadcastBuildRelation =
VeloxConfig.get.enableBroadcastBuildRelationInOffheap
new ColumnarBatchSerializeResult(
useOffheapBroadcastBuildRelation,
numRows,
java.util.Collections.singletonList(merged))
Comment on lines +1308 to +1310
auto ctx = getRuntime(env, wrapper);
int32_t numBatches = env->GetArrayLength(handles);
GLUTEN_DCHECK(numBatches > 0, "serializeAll requires at least one batch");
@wForget

wForget commented Jun 9, 2026

Copy link
Copy Markdown
Member

Thanks @Xtpacz . The current implementation merges the build side into a single ColumnBatch, which does not seem to be a general solution.

Verified on an internal Spark cluster running TPC-DS 5TB. On q64, with all other configs equal, the aggregated HashBuild total time dropped from 2.04h to 22.1min when mergeBatches=true (~5.5x reduction).

That said, based on the benchmark results you shared, the performance improvement is quite significant. Could you share more details about the previous bottleneck? Was the main issue caused by generating a large number of small ColumnBatch instances, or was there another factor contributing to the overhead?

@Xtpacz

Xtpacz commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @Xtpacz . The current implementation merges the build side into a single ColumnBatch, which does not seem to be a general solution.

Verified on an internal Spark cluster running TPC-DS 5TB. On q64, with all other configs equal, the aggregated HashBuild total time dropped from 2.04h to 22.1min when mergeBatches=true (~5.5x reduction).

That said, based on the benchmark results you shared, the performance improvement is quite significant. Could you share more details about the previous bottleneck? Was the main issue caused by generating a large number of small ColumnBatch instances, or was there another factor contributing to the overhead?

@wForget Thanks for the review!

Root cause: The per batch's serialize/deserialize overhead will across the full pipeline. In our q64 case (19.6B build rows, maxBatchSize=4096), this produces about 480K independent buffers. Each one goes through PrestoSerializer creation + ArrowBuf allocation on serialize, and PrestoVectorSerde.deserialize() + small-vector HashBuild on executor side. The executor-side cost dominates — small vectors (4096 rows) have poor vectorization efficiency and cache locality for hash table building.

On generality: This is not a new optimization — Gluten 1.2 used this exact merged serialization path. PR #9521 changed to per-batch to reduce native memory peak, which inadvertently caused this regression. Our patch simply restores the 1.2 behavior behind a config switch (default=false), keeping #9521's OOM-safe path as default.

If the team prefers a middle ground, we can do a hybrid — merge in groups of N batches to amortize overhead without holding the full partition in native memory. Happy to implement if preferred.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants