[GLUTEN-12251][VL] Add config switch to merge broadcast batches for BHJ performance#12259
[GLUTEN-12251][VL] Add config switch to merge broadcast batches for BHJ performance#12259Xtpacz wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a Velox-specific configuration switch to optionally merge broadcast build-side columnar batches into a single serialized payload, aiming to reduce per-batch overhead in BroadcastHashJoin (BHJ) HashBuild (fewer addInput calls), with JNI support and validation coverage.
Changes:
- Added a new Velox config
spark.gluten.velox.broadcastBuild.mergeBatches(defaultfalse) and wired it into broadcast build-side serialization. - Introduced a new JNI entry point
serializeAll(long[])to serialize multiple nativeColumnarBatchhandles into a singleJniUnsafeByteBuffer. - Added a Velox hash join test to compare results between merged vs per-batch broadcast build serialization, and documented the new config.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java | Exposes new JNI method serializeAll(long[]) for multi-batch serialization. |
| cpp/core/jni/JniWrapper.cc | Implements JNI serializeAll by appending multiple batches to the serializer and emitting one buffer. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala | Adds merge/per-batch switch in serializeStream for broadcast build-side serialization. |
| backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala | Registers the new mergeBatches config and accessor. |
| backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala | Adds coverage to ensure merged and per-batch paths produce equivalent results and still use BHJ. |
| docs/velox-configuration.md | Documents the new spark.gluten.velox.broadcastBuild.mergeBatches option. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| | spark.gluten.sql.enable.enhancedFeatures | 🔄 Dynamic | true | Enable some features including iceberg native write and other features. | | ||
| | spark.gluten.sql.rewrite.castArrayToString | 🔄 Dynamic | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | | ||
| | spark.gluten.velox.broadcast.build.targetBytesPerThread | ⚓ Static | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. | | ||
| | spark.gluten.velox.broadcastBuild.mergeBatches | 🔄 Dynamic | false | If enabled, all columnar batches in a broadcast build relation will be serialized into a single buffer to reduce the number of addInput calls in HashBuild operator. This can significantly improve BHJ performance when the broadcast table has many small batches. | |
| .doc( | ||
| "If enabled, all columnar batches in a broadcast build relation will be " + | ||
| "serialized into a single buffer to reduce the number of addInput calls in " + | ||
| "HashBuild operator. This can significantly improve BHJ performance when " + | ||
| "the broadcast table has many small batches.") |
| val merged = jniWrapper.serializeAll(handles.toArray) | ||
| val useOffheapBroadcastBuildRelation = | ||
| VeloxConfig.get.enableBroadcastBuildRelationInOffheap | ||
| new ColumnarBatchSerializeResult( | ||
| useOffheapBroadcastBuildRelation, | ||
| numRows, | ||
| java.util.Collections.singletonList(merged)) |
| auto ctx = getRuntime(env, wrapper); | ||
| int32_t numBatches = env->GetArrayLength(handles); | ||
| GLUTEN_DCHECK(numBatches > 0, "serializeAll requires at least one batch"); |
|
Thanks @Xtpacz . The current implementation merges the build side into a single
That said, based on the benchmark results you shared, the performance improvement is quite significant. Could you share more details about the previous bottleneck? Was the main issue caused by generating a large number of small |
@wForget Thanks for the review! Root cause: The per batch's serialize/deserialize overhead will across the full pipeline. In our q64 case (19.6B build rows, maxBatchSize=4096), this produces about 480K independent buffers. Each one goes through PrestoSerializer creation + ArrowBuf allocation on serialize, and PrestoVectorSerde.deserialize() + small-vector HashBuild on executor side. The executor-side cost dominates — small vectors (4096 rows) have poor vectorization efficiency and cache locality for hash table building. On generality: This is not a new optimization — Gluten 1.2 used this exact merged serialization path. PR #9521 changed to per-batch to reduce native memory peak, which inadvertently caused this regression. Our patch simply restores the 1.2 behavior behind a config switch (default=false), keeping #9521's OOM-safe path as default. If the team prefers a middle ground, we can do a hybrid — merge in groups of N batches to amortize overhead without holding the full partition in native memory. Happy to implement if preferred. |
Related issue: #12251
What changes are proposed in this pull request?
This PR adds a new config
spark.gluten.velox.broadcastBuild.mergeBatches(defaultfalse) that controls how columnar batches are serialized during broadcast build for BHJ.When enabled, all batches on the build side are serialized into a single buffer through a new
serializeAllJNI entry point, so the executor-sideHashBuildoperator receives oneaddInputcall instead of N. For broadcast tables that fan out into many small batches (e.g. whenspark.gluten.sql.columnar.maxBatchSizeis small or the build side is narrow), this materially reduces per-batch overhead.How was this patch tested?
Verified on an internal Spark cluster running TPC-DS 5TB. On q64, with all other configs equal, the aggregated HashBuild total time dropped from 2.04h to 22.1min when
mergeBatches=true(~5.5x reduction). Result correctness verified by comparing query output betweenmergeBatches=trueand=false.Was this patch authored or co-authored using generative AI tooling?
Co-authored using Claude (claude-opus-4.7)