Example: spillable PageStore for the Parquet ArrowWriter (on top of #10020)#10058
Closed
alamb wants to merge 13 commits into
Closed
Example: spillable PageStore for the Parquet ArrowWriter (on top of #10020)#10058alamb wants to merge 13 commits into
alamb wants to merge 13 commits into
Conversation
Introduce a "dumb" key/value page store that the ArrowWriter uses to buffer completed, serialized pages while a row group is being written. The store maps an opaque, store-allocated PageKey to a blob of bytes and knows nothing about pages, dictionaries, ordering, or offsets — the caller keeps the handles and decides what they mean. The default InMemoryPageStore keeps blobs in a Vec<Bytes>, byte-for-byte equivalent to the previous buffering with zero overhead. A PageStoreFactory is threaded through ArrowWriterOptions -> ArrowRowGroupWriterFactory -> ArrowColumnWriterFactory so users can plug in a backend (temp file, object storage) to bound peak write memory independently of row group size. ArrowColumnChunkData now holds (store, keys) and materializes blobs in write order at splice time, preserving the existing append_column path. Tests: - column::page_store unit tests for the in-memory backend contract. - A byte-identical round-trip test using a custom HashMap-backed store with sparse, non-contiguous handles, proving the writer relies only on the opaque-handle contract. - An always-on dhat integration test capturing the in-memory peak-heap baseline (memory grows with the row group), against which a spilling backend will be measured. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously the splice materialized an entire column chunk back into a Vec<Bytes> before copying it into the output file, so peak memory during the splice phase was bounded by the largest column chunk — defeating a spilling backend for skewed schemas. Replace the materialize-then-copy path with StreamingColumnChunkReader, a Read that takes each page blob back out of the store in write order as it is consumed and releases it immediately, so the splice holds at most one page in memory at a time. SerializedRowGroupWriter::append_column is refactored to delegate to a new append_column_from_read that consumes an owned Read (append_column itself is unchanged for external ChunkReader callers). For the default in-memory store this is behavior-preserving (it already holds the bytes); for a spilling store it keeps the splice within the memory bound. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pilling backend PageKey's field was private, so an external PageStore implementor could not mint the handle it must return from put() — the trait was unusable outside the crate. Add public PageKey::new/get so any backend can allocate its own opaque, dense handles. Extend the dhat integration test with a temp-file PageStore backend (one unlinked temp file per column chunk; put appends, take seeks+reads) and assert the headline invariant: writing a skewed ~16 MiB single row group, peak heap drops from ~18 MiB with the in-memory store to ~3 MiB with the spilling store — bounded by the in-flight encoder/dictionary buffers rather than the row group size. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…oint #2) Dictionary-encoded columns buffered every completed data page in GenericColumnWriter.data_pages until close(), because the dictionary page must be written first but isn't final until all values are seen. Those pages never reached the PageStore, so spilling couldn't bound them — a low-cardinality 4.2M-row column peaked ~2.5 MiB regardless of backend. Add PageWriter::defers_dictionary_ordering(): a writer that buffers the whole chunk and splices it later (the Arrow path) can accept data pages before the dictionary page and order them itself. When set, the column writer streams dictionary-column data pages straight through instead of buffering them. ArrowPageWriter returns true, holds the (bounded) dictionary page in memory since it now arrives last, and at splice emits it first; the buffer-relative page offsets recorded in production order are rewritten to the dictionary-first layout there. The column-at-a-time SerializedFileWriter path is unchanged (defaults to false). Also fix memory_size() accounting: instead of counting bytes written (which over-reports once pages are spilled off-heap), ask the page writer how much it actually holds resident via PageWriter::buffered_memory_size() and PageStore::memory_size(). For the in-memory store this is unchanged; for a spilling store it drops to ~0 plus the retained dictionary page. Result: the dict-column case drops from ~2.69 MiB to ~0.48 MiB peak heap with a spilling backend. Adds an offset-index-disabled dictionary round-trip test and store memory-size unit tests; extends the dhat test with the dictionary-column scenario. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The test uses parquet::arrow, so without a required-features entry it was auto-discovered and compiled under --all-targets --no-default-features, breaking that CI compilation check. Mirror the other arrow integration tests with required-features = ["arrow"]. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the experimental, single-maintainer dhat crate with the in-tree thread-local tracking-allocator pattern already used by parquet/benches/arrow_reader_peak_memory.rs, and fold the test into the existing arrow_writer test binary instead of a dedicated one (saving a compile/link). The measurement still observes real process peak heap and the assertions are unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Adds `parquet/examples/spill_page_store.rs`, a runnable demonstration of the pluggable `PageStore` API. It implements a spilling `TempFilePageStore` (one temp file per column chunk) and writes a wide, skewed Parquet file — a few Int64 columns, some small (~20 byte) string columns, and a configurable number of large (~8 KiB) string columns — into a single row group, reporting peak `ArrowWriter::memory_size()` with and without spilling. --large-string-columns N number of fat ~8 KiB string columns (default 10) --spill use the spilling TempFilePageStore vs the default On the default 10 fat columns the spilling store cuts peak writer memory from ~161 MiB (whole row group buffered) to ~21 MiB (in-flight encoder buffers only), and produces a byte-identical file to the in-memory path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The spill_page_store example now threads a shared SpillStats (atomics) through the TempFilePageStoreFactory into each per-column store, and prints the number of temp files created (one per column chunk) plus total bytes spilled — written on `put` and read back on `take`. Also bumps the default `--batch-size` to 8192. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Switches the example's spilling backend from anonymous tempfile() handles to NamedTempFile so each per-column temp file has a reportable path, records (column index, path, bytes) for every file when its store is dropped, and prints one line per spill file alongside the existing totals. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…mn type Rewrites the spill_page_store example to use the low-level ArrowColumnWriter API so columns are encoded on multiple threads (one worker thread per column, the documented parallel pattern). Each worker owns its own ArrowColumnWriter backed by its own PageStore, receives ArrowLeafColumns over a bounded channel, tracks its writer's peak resident bytes, and returns the finished ArrowColumnChunk; the main thread splices each chunk into the row group, streaming pages back out of the store. The spilling benefit is preserved (append_to_row_group reads one page at a time from the store), and the output stays byte-identical to the in-memory path. Also adds the column data type to the per-spill-file listing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds a wall-clock timer (std::time::Instant) over the whole run and reports "Total elapsed time" in the final summary. Also includes the faster data generation (a single batch is built once and reused across iterations). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Restructures the example to keep all cores busy: N/2 generator threads build record batches and N/2 encoder threads encode them (N = available cores). The main thread re-orders generated batches by index — keeping the output file deterministic regardless of thread timing — and broadcasts each to the encoders, which each own a disjoint subset of columns (and their PageStores). Finished chunks are sorted into schema order and spliced in. Also removes the XorShift RNG in favor of deterministic counter-derived values, so a run is fully reproducible (verified byte-identical between the in-memory and spilling backends, and run-to-run). Channel depths are kept shallow so in-flight input batches don't dominate RSS on wide/huge schemas. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Contributor
Author
|
I actually think the example in the docs in the following PR is adequate and there is no need for a separate PR |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is this?
A draft PR that bundles #10020 ("Pluggable page spilling API for the Parquet ArrowWriter") plus a runnable example that exercises its new public
PageStoreAPI from the outside, as an end user would. It is intended to help review #10020 — see that PR for the API itself.This branch contains #10020's commits and adds one file on top,
parquet/examples/spill_page_store.rs.The example
Writes a wide, skewed Parquet file into a single row group:
Int64columns (--int-columns, default 3)--small-string-columns, default 5)--large-string-columns, default 10)and reports peak
ArrowWriter::memory_size()with and without a spilling page store.The spilling backend is a ~30-line
TempFilePageStore(one unlinked temp file per column chunk):putappends a page blob and returns an opaquePageKey,takeseeks and reads it back, andmemory_size()keeps its default of0because the bytes now live in the file, not on the heap.Flags
--large-string-columns N— number of fat ~8 KiB string columns (default 10)--spill— use the spillingTempFilePageStoreinstead of the default in-memory buffering--small-string-columns,--int-columns,--rows,--batch-size,--output <path>Running
What it shows
On the defaults (10 fat columns, ~160 MiB row group):
ArrowWriter::memory_size()TempFilePageStore(--spill)i.e. spilling bounds peak writer memory by the in-flight encoder buffers rather than the row group size. Widen the skew with
--large-string-columns 30to make the gap bigger. Writing the same data to a real file with--outputproduces a byte-identical Parquet file in both modes, confirming the spilling path is a transparent drop-in.Notes for review
tempfileandsysinfoare alreadydev-dependenciesof theparquetcrate, so the example needs no new deps; it is gated onrequired-features = ["arrow", "cli"].TempFilePageStorehere is intentionally the same shape as the one inparquet/tests/arrow_writer.rs, so the example and the test corroborate each other.🤖 Generated with Claude Code