Skip to content

[VL] Intern StructType <-> JSON codec on the ColumnarCachedBatchSerializer hot path#12236

Open
yaooqinn wants to merge 7 commits into
apache:mainfrom
yaooqinn:kentyao/fu-d7-schema-intern-cache
Open

[VL] Intern StructType <-> JSON codec on the ColumnarCachedBatchSerializer hot path#12236
yaooqinn wants to merge 7 commits into
apache:mainfrom
yaooqinn:kentyao/fu-d7-schema-intern-cache

Conversation

@yaooqinn

@yaooqinn yaooqinn commented Jun 4, 2026

Copy link
Copy Markdown
Member

Summary

Add an LRU intern cache for the StructType <-> JSON wire form used by
ColumnarCachedBatchSerializer, and wire it into the hot write/read
paths. The wire format is unchanged; the cache memoizes pure functions
(StructType.json / DataType.fromJson) that a single Spark query
typically calls thousands of times against a handful of distinct
schemas.

What

Three commits, logical order:

  1. Add SchemaJsonInternCache -- private[execution] class, two
    Caffeine LRU caches (cap=256 each), encode side
    StructType -> Array[Byte], decode side String -> StructType.
    Thread-safety delegated to Caffeine get(key, mappingFunction).
    Six tests pin determinism / capacity / concurrency invariants.

  2. Wire into ColumnarCachedBatchSerializer -- Replace the two
    ad-hoc codec calls (write side and read side) with cache lookups.
    Cache lives on the serializer companion object so it survives
    Kryo's per-stream serializer churn.

  3. Extend microbench -- Three sections appended to
    ColumnarTableCachePartitionStatsBenchmark:

    • encode round-trip over synthetic + TPC-DS schemas
    • decode round-trip over the same fixture set
    • working-set sweep at cap, 2xcap, 4xcap

Why

Schema codec is a pure-function hot path. The bench shows the encode
leg saturates at on-leg ~6 ms regardless of working set (cap=256 is
sufficient for the fixture set), while the off-leg ranges from
seconds at small widths to minutes at 1000-field schemas. Realistic
TPC-DS schema sees a 12x decode speedup.

Bench numbers (verbatim from benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt)

decode tpcds-store_sales-23col:
  off (raw DataType.fromJson per call)     2207 ms     1.0X
  on  (intern.decodeStructType)             185 ms    11.9X

C1 hit (256 schemas == cap):
  off                                       102 ms     1.0X
  on                                          4 ms    24.3X

C3 churn (1024 schemas == 4x cap):
  off                                       407 ms     1.0X
  on                                         17 ms    24.0X

Pre-existing partition-stats sections rerun in this commit show no
regression (build 1.0x / 0.9x, high-sel 8.4x, low-sel 1.5x, point
12.0x -- within run-to-run variance of the committed baseline).

Risk

  • Wire format unchanged -- read/write still emit length-prefixed
    UTF-8 JSON bytes. Existing on-disk caches readable.
  • Cache miss path is identical to current behavior (pure functions
    re-evaluate; no exception caching).
  • Heap retention bounded at 256 entries x schema-JSON size; ~MB
    worst case on pathological 1000-field schemas, KB-MB on realistic
    workloads.
  • No new SQLConf, no logger, no metric.

Test

  • New: SchemaJsonInternCacheSuite (6 tests, determinism /
    capacity / concurrency)
  • Existing: ColumnarCachedBatchSerializerHelperSuite (4 tests)
    passes against wired code, confirming Kryo round-trip + frame
    parsing + fall-back paths intact.

@github-actions github-actions Bot added the VELOX label Jun 4, 2026
yaooqinn added 3 commits June 4, 2026 23:24
ColumnarCachedBatchSerializer hot paths call StructType.json on every
batch write and DataType.fromJson on every batch read. Both are pure
functions of the schema, and a single Spark query typically caches
many batches that share one (or a handful of) schemas. Memoize the
round-trip with a bounded process-local LRU cache to avoid redundant
JSON encode/decode work without changing the wire format.

Two Caffeine caches, cap = 256 entries each:
- encode side: StructType -> canonical UTF-8 JSON Array[Byte]
- decode side: canonical JSON String -> canonical StructType

Thread-safety is delegated to Caffeine's at-most-once-compute-per-key
contract. Cache misses are indistinguishable from the no-cache
baseline (same pure source path).

No user-facing surface added: class is private[execution], no SQLConf,
no setter, no constructor parameter. Wiring into
ColumnarCachedBatchSerializer is a follow-up commit gated on bench
numbers (StructTypeJsonCodecBenchmark, follow-up).

Six tests pin invariants:
- determinism (encode/decode), with canonical-instance guarantee
- capacity (cap=256, eviction past cap does not corrupt)
- concurrency (8 threads, overlapping keys, no error)
Replace ad-hoc StructType.json.getBytes / DataType.fromJson calls on the
CachedColumnarBatch Kryo write/read paths with the singleton intern cache
introduced earlier in this series. The cache is held on the serializer
companion object so it survives across Kryo's per-stream serializer
instances within the same JVM.

Encode/decode hot-path microbench (StructTypeJsonCodecBenchmark) shows
on-leg ~6 ms regardless of working-set size; off-leg ~10s of ms (small
schemas) to ~minutes (1000-field schemas).
…ma-codec intern microbench

Add three microbench sections to the existing partition-stats benchmark
file rather than introducing a standalone bench artifact, to keep the
columnar table cache benchmark surface in a single committed
-results.txt and avoid splitting reviewer attention across files.

Sections compare two distinct method calls in the same JVM as cache
off (raw codec) vs cache on (SchemaJsonInternCache memoized round-trip),
with no toggle on the cache class itself:
  off-leg = schema.json.getBytes(UTF_8) /
            DataType.fromJson(...).asInstanceOf[StructType]
  on-leg  = intern.encodeBytes(schema)  /
            intern.decodeStructType(bytes)

Three sections:
- A: encode round-trip across 6 synthetic widths x name-length
     combinations + 1 realistic TPC-DS store_sales 23-col schema
     (1M iters/case)
- B: decode round-trip across the same fixture set (100K iters/case)
- C: working-set sweep at three regimes around cap=256
     (C1=cap 100% hit, C2=2x cap eviction, C3=4x cap churn)

Sanity gate at results-read time: Section B on-leg >= 2x off-leg
per-call at the realistic schema; sub-2x signals cache machinery
overhead is eating the savings. Working-set sweep gates documented
inline at Section C.

Bench artifact only; no -results.txt update committed in this commit
(pre-GHA local JDK17 smoke at full scale is the immediate next step
before any full-scale invocation that writes results).
Copilot AI review requested due to automatic review settings June 4, 2026 16:10
@yaooqinn yaooqinn force-pushed the kentyao/fu-d7-schema-intern-cache branch from 16b6e6f to b768274 Compare June 4, 2026 16:10

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a process-wide intern/memoization cache for the StructType <-> JSON schema codec used on the ColumnarCachedBatchSerializer hot path, aiming to eliminate repeated StructType.json / DataType.fromJson work across many cached batches with the same few schemas.

Changes:

  • Added SchemaJsonInternCache backed by Caffeine, plus a new unit test suite covering determinism, capacity pressure, and concurrency.
  • Wired the cache into CachedColumnarBatchKryoSerializer for both schema write (encode) and read (decode) paths.
  • Extended the existing microbenchmark to measure encode/decode speedups and working-set behavior, and updated the checked-in benchmark results.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt Updates benchmark output to include new schema codec sections and refreshed numbers.
backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala Adds unit tests for schema codec caching invariants (determinism/capacity/concurrency).
backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala Adds new microbench sections for schema JSON encode/decode and working-set sweep.
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala Introduces the new Caffeine-backed intern cache implementation.
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala Uses the new cache in the serializer hot read/write schema paths via a companion-object singleton.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings June 5, 2026 01:46

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Copilot AI review requested due to automatic review settings June 5, 2026 02:20

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@yaooqinn yaooqinn force-pushed the kentyao/fu-d7-schema-intern-cache branch from b9d0a14 to 8ef8fd4 Compare June 5, 2026 02:40
- scaladoc: clarify "size-bounded W-TinyLFU" (not strict LRU)
- scaladoc: document encodeBytes shared-array contract
- CAP comment: rewrite to empirical wording, reference bench harness by name
- test: try/finally around futures.get to prevent thread-pool leak
- bench: rewording "Gates" -> "Manual interpretation guidance"
Copilot AI review requested due to automatic review settings June 5, 2026 05:23

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

- suite scaladoc: drop stale "LRU" wording (size-bounded W-TinyLFU)
- bench: rewrite working-set sweep prose to describe regimes only,
  no prediction; point to committed -results.txt for actual numbers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants