Skip to content

Example: spillable PageStore for the Parquet ArrowWriter (on top of #10020)#10058

Closed
alamb wants to merge 13 commits into
apache:mainfrom
alamb:parquet-page-spill-example
Closed

Example: spillable PageStore for the Parquet ArrowWriter (on top of #10020)#10058
alamb wants to merge 13 commits into
apache:mainfrom
alamb:parquet-page-spill-example

Conversation

@alamb

@alamb alamb commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

What is this?

A draft PR that bundles #10020 ("Pluggable page spilling API for the Parquet ArrowWriter") plus a runnable example that exercises its new public PageStore API from the outside, as an end user would. It is intended to help review #10020 — see that PR for the API itself.

This branch contains #10020's commits and adds one file on top, parquet/examples/spill_page_store.rs.

The example

Writes a wide, skewed Parquet file into a single row group:

  • a few Int64 columns (--int-columns, default 3)
  • some small ~20-byte string columns (--small-string-columns, default 5)
  • a configurable pile of fat ~8 KiB string columns (--large-string-columns, default 10)

and reports peak ArrowWriter::memory_size() with and without a spilling page store.

The spilling backend is a ~30-line TempFilePageStore (one unlinked temp file per column chunk): put appends a page blob and returns an opaque PageKey, take seeks and reads it back, and memory_size() keeps its default of 0 because the bytes now live in the file, not on the heap.

Flags

  • --large-string-columns N — number of fat ~8 KiB string columns (default 10)
  • --spill — use the spilling TempFilePageStore instead of the default in-memory buffering
  • also: --small-string-columns, --int-columns, --rows, --batch-size, --output <path>

Running

# Baseline: default in-memory page buffering
cargo run --release --features cli --example spill_page_store

# Spill completed pages to temp files
cargo run --release --features cli --example spill_page_store -- --spill

What it shows

On the defaults (10 fat columns, ~160 MiB row group):

page buffering peak ArrowWriter::memory_size()
in-memory (default) ~161 MiB
TempFilePageStore (--spill) ~21 MiB

i.e. spilling bounds peak writer memory by the in-flight encoder buffers rather than the row group size. Widen the skew with --large-string-columns 30 to make the gap bigger. Writing the same data to a real file with --output produces a byte-identical Parquet file in both modes, confirming the spilling path is a transparent drop-in.

Notes for review

  • tempfile and sysinfo are already dev-dependencies of the parquet crate, so the example needs no new deps; it is gated on required-features = ["arrow", "cli"].
  • The TempFilePageStore here is intentionally the same shape as the one in parquet/tests/arrow_writer.rs, so the example and the test corroborate each other.

🤖 Generated with Claude Code

adriangb and others added 8 commits June 2, 2026 15:18
Introduce a "dumb" key/value page store that the ArrowWriter uses to
buffer completed, serialized pages while a row group is being written.
The store maps an opaque, store-allocated PageKey to a blob of bytes and
knows nothing about pages, dictionaries, ordering, or offsets — the
caller keeps the handles and decides what they mean.

The default InMemoryPageStore keeps blobs in a Vec<Bytes>, byte-for-byte
equivalent to the previous buffering with zero overhead. A PageStoreFactory
is threaded through ArrowWriterOptions -> ArrowRowGroupWriterFactory ->
ArrowColumnWriterFactory so users can plug in a backend (temp file, object
storage) to bound peak write memory independently of row group size.

ArrowColumnChunkData now holds (store, keys) and materializes blobs in
write order at splice time, preserving the existing append_column path.

Tests:
- column::page_store unit tests for the in-memory backend contract.
- A byte-identical round-trip test using a custom HashMap-backed store
  with sparse, non-contiguous handles, proving the writer relies only on
  the opaque-handle contract.
- An always-on dhat integration test capturing the in-memory peak-heap
  baseline (memory grows with the row group), against which a spilling
  backend will be measured.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously the splice materialized an entire column chunk back into a
Vec<Bytes> before copying it into the output file, so peak memory during
the splice phase was bounded by the largest column chunk — defeating a
spilling backend for skewed schemas.

Replace the materialize-then-copy path with StreamingColumnChunkReader, a
Read that takes each page blob back out of the store in write order as it
is consumed and releases it immediately, so the splice holds at most one
page in memory at a time. SerializedRowGroupWriter::append_column is
refactored to delegate to a new append_column_from_read that consumes an
owned Read (append_column itself is unchanged for external ChunkReader
callers).

For the default in-memory store this is behavior-preserving (it already
holds the bytes); for a spilling store it keeps the splice within the
memory bound.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pilling backend

PageKey's field was private, so an external PageStore implementor could
not mint the handle it must return from put() — the trait was unusable
outside the crate. Add public PageKey::new/get so any backend can allocate
its own opaque, dense handles.

Extend the dhat integration test with a temp-file PageStore backend (one
unlinked temp file per column chunk; put appends, take seeks+reads) and
assert the headline invariant: writing a skewed ~16 MiB single row group,
peak heap drops from ~18 MiB with the in-memory store to ~3 MiB with the
spilling store — bounded by the in-flight encoder/dictionary buffers
rather than the row group size.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…oint #2)

Dictionary-encoded columns buffered every completed data page in
GenericColumnWriter.data_pages until close(), because the dictionary page
must be written first but isn't final until all values are seen. Those
pages never reached the PageStore, so spilling couldn't bound them — a
low-cardinality 4.2M-row column peaked ~2.5 MiB regardless of backend.

Add PageWriter::defers_dictionary_ordering(): a writer that buffers the
whole chunk and splices it later (the Arrow path) can accept data pages
before the dictionary page and order them itself. When set, the column
writer streams dictionary-column data pages straight through instead of
buffering them. ArrowPageWriter returns true, holds the (bounded)
dictionary page in memory since it now arrives last, and at splice emits
it first; the buffer-relative page offsets recorded in production order
are rewritten to the dictionary-first layout there. The column-at-a-time
SerializedFileWriter path is unchanged (defaults to false).

Also fix memory_size() accounting: instead of counting bytes written
(which over-reports once pages are spilled off-heap), ask the page writer
how much it actually holds resident via PageWriter::buffered_memory_size()
and PageStore::memory_size(). For the in-memory store this is unchanged;
for a spilling store it drops to ~0 plus the retained dictionary page.

Result: the dict-column case drops from ~2.69 MiB to ~0.48 MiB peak heap
with a spilling backend. Adds an offset-index-disabled dictionary
round-trip test and store memory-size unit tests; extends the dhat test
with the dictionary-column scenario.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The test uses parquet::arrow, so without a required-features entry it was
auto-discovered and compiled under --all-targets --no-default-features,
breaking that CI compilation check. Mirror the other arrow integration
tests with required-features = ["arrow"].

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the experimental, single-maintainer dhat crate with the in-tree
thread-local tracking-allocator pattern already used by
parquet/benches/arrow_reader_peak_memory.rs, and fold the test into the
existing arrow_writer test binary instead of a dedicated one (saving a
compile/link). The measurement still observes real process peak heap and
the assertions are unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Adds `parquet/examples/spill_page_store.rs`, a runnable demonstration of the
pluggable `PageStore` API. It implements a spilling `TempFilePageStore` (one
temp file per column chunk) and writes a wide, skewed Parquet file — a few
Int64 columns, some small (~20 byte) string columns, and a configurable number
of large (~8 KiB) string columns — into a single row group, reporting peak
`ArrowWriter::memory_size()` with and without spilling.

  --large-string-columns N   number of fat ~8 KiB string columns (default 10)
  --spill                    use the spilling TempFilePageStore vs the default

On the default 10 fat columns the spilling store cuts peak writer memory from
~161 MiB (whole row group buffered) to ~21 MiB (in-flight encoder buffers
only), and produces a byte-identical file to the in-memory path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
alamb and others added 5 commits June 3, 2026 13:11
The spill_page_store example now threads a shared SpillStats (atomics) through
the TempFilePageStoreFactory into each per-column store, and prints the number
of temp files created (one per column chunk) plus total bytes spilled — written
on `put` and read back on `take`. Also bumps the default `--batch-size` to 8192.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Switches the example's spilling backend from anonymous tempfile() handles to
NamedTempFile so each per-column temp file has a reportable path, records
(column index, path, bytes) for every file when its store is dropped, and prints
one line per spill file alongside the existing totals.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…mn type

Rewrites the spill_page_store example to use the low-level ArrowColumnWriter API
so columns are encoded on multiple threads (one worker thread per column, the
documented parallel pattern). Each worker owns its own ArrowColumnWriter backed
by its own PageStore, receives ArrowLeafColumns over a bounded channel, tracks
its writer's peak resident bytes, and returns the finished ArrowColumnChunk; the
main thread splices each chunk into the row group, streaming pages back out of
the store. The spilling benefit is preserved (append_to_row_group reads one page
at a time from the store), and the output stays byte-identical to the in-memory
path.

Also adds the column data type to the per-spill-file listing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds a wall-clock timer (std::time::Instant) over the whole run and reports
"Total elapsed time" in the final summary. Also includes the faster data
generation (a single batch is built once and reused across iterations).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Restructures the example to keep all cores busy: N/2 generator threads build
record batches and N/2 encoder threads encode them (N = available cores). The
main thread re-orders generated batches by index — keeping the output file
deterministic regardless of thread timing — and broadcasts each to the encoders,
which each own a disjoint subset of columns (and their PageStores). Finished
chunks are sorted into schema order and spliced in.

Also removes the XorShift RNG in favor of deterministic counter-derived values,
so a run is fully reproducible (verified byte-identical between the in-memory and
spilling backends, and run-to-run). Channel depths are kept shallow so in-flight
input batches don't dominate RSS on wide/huge schemas.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@alamb

alamb commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

I actually think the example in the docs in the following PR is adequate and there is no need for a separate PR

@alamb alamb closed this Jun 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

parquet Changes to the parquet crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants