[VL] Intern StructType <-> JSON codec on the ColumnarCachedBatchSerializer hot path#12236
Open
yaooqinn wants to merge 7 commits into
Open
[VL] Intern StructType <-> JSON codec on the ColumnarCachedBatchSerializer hot path#12236yaooqinn wants to merge 7 commits into
yaooqinn wants to merge 7 commits into
Conversation
ColumnarCachedBatchSerializer hot paths call StructType.json on every batch write and DataType.fromJson on every batch read. Both are pure functions of the schema, and a single Spark query typically caches many batches that share one (or a handful of) schemas. Memoize the round-trip with a bounded process-local LRU cache to avoid redundant JSON encode/decode work without changing the wire format. Two Caffeine caches, cap = 256 entries each: - encode side: StructType -> canonical UTF-8 JSON Array[Byte] - decode side: canonical JSON String -> canonical StructType Thread-safety is delegated to Caffeine's at-most-once-compute-per-key contract. Cache misses are indistinguishable from the no-cache baseline (same pure source path). No user-facing surface added: class is private[execution], no SQLConf, no setter, no constructor parameter. Wiring into ColumnarCachedBatchSerializer is a follow-up commit gated on bench numbers (StructTypeJsonCodecBenchmark, follow-up). Six tests pin invariants: - determinism (encode/decode), with canonical-instance guarantee - capacity (cap=256, eviction past cap does not corrupt) - concurrency (8 threads, overlapping keys, no error)
Replace ad-hoc StructType.json.getBytes / DataType.fromJson calls on the CachedColumnarBatch Kryo write/read paths with the singleton intern cache introduced earlier in this series. The cache is held on the serializer companion object so it survives across Kryo's per-stream serializer instances within the same JVM. Encode/decode hot-path microbench (StructTypeJsonCodecBenchmark) shows on-leg ~6 ms regardless of working-set size; off-leg ~10s of ms (small schemas) to ~minutes (1000-field schemas).
…ma-codec intern microbench
Add three microbench sections to the existing partition-stats benchmark
file rather than introducing a standalone bench artifact, to keep the
columnar table cache benchmark surface in a single committed
-results.txt and avoid splitting reviewer attention across files.
Sections compare two distinct method calls in the same JVM as cache
off (raw codec) vs cache on (SchemaJsonInternCache memoized round-trip),
with no toggle on the cache class itself:
off-leg = schema.json.getBytes(UTF_8) /
DataType.fromJson(...).asInstanceOf[StructType]
on-leg = intern.encodeBytes(schema) /
intern.decodeStructType(bytes)
Three sections:
- A: encode round-trip across 6 synthetic widths x name-length
combinations + 1 realistic TPC-DS store_sales 23-col schema
(1M iters/case)
- B: decode round-trip across the same fixture set (100K iters/case)
- C: working-set sweep at three regimes around cap=256
(C1=cap 100% hit, C2=2x cap eviction, C3=4x cap churn)
Sanity gate at results-read time: Section B on-leg >= 2x off-leg
per-call at the realistic schema; sub-2x signals cache machinery
overhead is eating the savings. Working-set sweep gates documented
inline at Section C.
Bench artifact only; no -results.txt update committed in this commit
(pre-GHA local JDK17 smoke at full scale is the immediate next step
before any full-scale invocation that writes results).
16b6e6f to
b768274
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a process-wide intern/memoization cache for the StructType <-> JSON schema codec used on the ColumnarCachedBatchSerializer hot path, aiming to eliminate repeated StructType.json / DataType.fromJson work across many cached batches with the same few schemas.
Changes:
- Added
SchemaJsonInternCachebacked by Caffeine, plus a new unit test suite covering determinism, capacity pressure, and concurrency. - Wired the cache into
CachedColumnarBatchKryoSerializerfor both schema write (encode) and read (decode) paths. - Extended the existing microbenchmark to measure encode/decode speedups and working-set behavior, and updated the checked-in benchmark results.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt | Updates benchmark output to include new schema codec sections and refreshed numbers. |
| backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala | Adds unit tests for schema codec caching invariants (determinism/capacity/concurrency). |
| backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala | Adds new microbench sections for schema JSON encode/decode and working-set sweep. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala | Introduces the new Caffeine-backed intern cache implementation. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala | Uses the new cache in the serializer hot read/write schema paths via a companion-object singleton. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
b9d0a14 to
8ef8fd4
Compare
- scaladoc: clarify "size-bounded W-TinyLFU" (not strict LRU) - scaladoc: document encodeBytes shared-array contract - CAP comment: rewrite to empirical wording, reference bench harness by name - test: try/finally around futures.get to prevent thread-pool leak - bench: rewording "Gates" -> "Manual interpretation guidance"
- suite scaladoc: drop stale "LRU" wording (size-bounded W-TinyLFU) - bench: rewrite working-set sweep prose to describe regimes only, no prediction; point to committed -results.txt for actual numbers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add an LRU intern cache for the
StructType<-> JSON wire form used byColumnarCachedBatchSerializer, and wire it into the hot write/readpaths. The wire format is unchanged; the cache memoizes pure functions
(
StructType.json/DataType.fromJson) that a single Spark querytypically calls thousands of times against a handful of distinct
schemas.
What
Three commits, logical order:
Add SchemaJsonInternCache --
private[execution]class, twoCaffeine LRU caches (cap=256 each), encode side
StructType -> Array[Byte], decode sideString -> StructType.Thread-safety delegated to Caffeine
get(key, mappingFunction).Six tests pin determinism / capacity / concurrency invariants.
Wire into ColumnarCachedBatchSerializer -- Replace the two
ad-hoc codec calls (write side and read side) with cache lookups.
Cache lives on the serializer companion object so it survives
Kryo's per-stream serializer churn.
Extend microbench -- Three sections appended to
ColumnarTableCachePartitionStatsBenchmark:Why
Schema codec is a pure-function hot path. The bench shows the encode
leg saturates at on-leg ~6 ms regardless of working set (cap=256 is
sufficient for the fixture set), while the off-leg ranges from
seconds at small widths to minutes at 1000-field schemas. Realistic
TPC-DS schema sees a 12x decode speedup.
Bench numbers (verbatim from
benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt)Pre-existing partition-stats sections rerun in this commit show no
regression (build 1.0x / 0.9x, high-sel 8.4x, low-sel 1.5x, point
12.0x -- within run-to-run variance of the committed baseline).
Risk
UTF-8 JSON bytes. Existing on-disk caches readable.
re-evaluate; no exception caching).
worst case on pathological 1000-field schemas, KB-MB on realistic
workloads.
Test
SchemaJsonInternCacheSuite(6 tests, determinism /capacity / concurrency)
ColumnarCachedBatchSerializerHelperSuite(4 tests)passes against wired code, confirming Kryo round-trip + frame
parsing + fall-back paths intact.