diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 79542caed9b7..6db914733927 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -21,7 +21,6 @@ use crate::column::chunker::ContentDefinedChunker; use bytes::Bytes; use std::io::{Read, Write}; -use std::iter::Peekable; use std::slice::Iter; use std::sync::{Arc, Mutex}; use std::vec::IntoIter; @@ -37,6 +36,7 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_pr use crate::arrow::ArrowSchemaConverter; use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::basic::PageType; use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::page_encryption::PageEncryptor; use crate::column::writer::encoder::ColumnValueEncoder; @@ -49,7 +49,6 @@ use crate::encryption::encrypt::FileEncryptor; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; -use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift}; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -58,6 +57,12 @@ use levels::{ArrayLevels, calculate_array_levels}; mod byte_array; mod levels; +#[doc(inline)] +pub use crate::column::page_store::{ + InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs, + PageStoreFactory, +}; + /// Encodes [`RecordBatch`] to parquet /// /// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded @@ -263,8 +268,12 @@ impl ArrowWriter { let file_writer = SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?; - let row_group_writer_factory = + let mut row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone()); + if let Some(page_store_factory) = options.page_store_factory { + row_group_writer_factory = + row_group_writer_factory.with_page_store_factory(page_store_factory); + } let cdc_chunkers = props_ptr .content_defined_chunking() @@ -556,6 +565,7 @@ pub struct ArrowWriterOptions { skip_arrow_metadata: bool, schema_root: Option, schema_descr: Option, + page_store_factory: Option>, } impl ArrowWriterOptions { @@ -569,6 +579,90 @@ impl ArrowWriterOptions { Self { properties, ..self } } + /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row + /// group is being written. + /// + /// By default (an [`InMemoryPageStore`] per column chunk) completed pages + /// are buffered on the heap until the row group is flushed, so peak memory + /// grows with the row group size. Supplying a factory that spills to a temp + /// file or object storage instead bounds peak write memory, decoupling it + /// from the row group size while keeping large, read-optimal row groups. + /// + /// # Example: a custom [`PageStore`] + /// + /// A store only has to map an opaque, store-allocated [`PageKey`] to a blob + /// and hand the blob back once. The keys need not be dense or sequential — + /// here a `HashMap`-backed store mints sparse handles, proving the writer + /// relies only on the opaque-handle contract. A real spilling backend would + /// write the bytes to a temp file in `put` and read them back in `take`. + /// + /// ``` + /// # use std::collections::HashMap; + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch}; + /// # use parquet::arrow::arrow_writer::{ + /// # ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory, + /// # }; + /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader; + /// # use parquet::errors::{ParquetError, Result}; + /// #[derive(Default)] + /// struct MapPageStore { + /// blobs: HashMap, + /// next: u64, + /// } + /// + /// impl PageStore for MapPageStore { + /// fn put(&mut self, value: Bytes) -> Result { + /// // Mint a sparse handle (every other integer) to show the writer + /// // never assumes anything about the key's value. + /// let key = PageKey::new(self.next); + /// self.next += 2; + /// self.blobs.insert(key.get(), value); + /// Ok(key) + /// } + /// + /// fn take(&mut self, key: PageKey) -> Result { + /// self.blobs + /// .remove(&key.get()) + /// .ok_or_else(|| ParquetError::General(format!("invalid key {}", key.get()))) + /// } + /// } + /// + /// #[derive(Debug)] + /// struct MapPageStoreFactory; + /// + /// impl PageStoreFactory for MapPageStoreFactory { + /// fn create(&self, args: &PageStoreArgs<'_>) -> Result> { + /// // `args` exposes the column index and descriptor (physical/logical + /// // type, path), so a real backend could spill only large columns. + /// let _ = (args.column_index(), args.column_descriptor()); + /// Ok(Box::new(MapPageStore::default())) + /// } + /// } + /// + /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef; + /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + /// + /// let options = + /// ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory)); + /// let mut buffer = Vec::new(); + /// let mut writer = + /// ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap(); + /// writer.write(&to_write).unwrap(); + /// writer.close().unwrap(); + /// + /// // The file is byte-identical to one written with the default store. + /// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap(); + /// assert_eq!(to_write, reader.next().unwrap().unwrap()); + /// ``` + pub fn with_page_store_factory(self, page_store_factory: Arc) -> Self { + Self { + page_store_factory: Some(page_store_factory), + ..self + } + } + /// Skip encoding the embedded arrow metadata (defaults to `false`) /// /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema @@ -603,52 +697,108 @@ impl ArrowWriterOptions { } } -/// A single column chunk produced by [`ArrowColumnWriter`] -#[derive(Default)] +/// A single column chunk produced by [`ArrowColumnWriter`]. +/// +/// Holds the serialized page blobs (each page's header ‖ compressed data, in +/// write order) in a [`PageStore`], plus the handles needed to read them back, +/// in order, when the chunk is spliced into the output file. struct ArrowColumnChunkData { length: usize, - data: Vec, + store: Box, + keys: Vec, + /// The dictionary page's serialized blobs (header ‖ data), held in memory + /// rather than the store. + /// + /// A dictionary page is produced at most once and bounded by + /// `dict_page_size_limit`, but it must be written *first* in the chunk even + /// though the data pages reach the writer before it (see + /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only + /// round-trip ~1 page to the backend and straight back, so it is kept here + /// and emitted ahead of the data pages at splice. Empty for non-dictionary + /// columns. + dictionary: Vec, } -impl Length for ArrowColumnChunkData { - fn len(&self) -> u64 { - self.length as _ +impl ArrowColumnChunkData { + fn new(store: Box) -> Self { + Self { + length: 0, + store, + keys: Vec::new(), + dictionary: Vec::new(), + } } -} -impl ChunkReader for ArrowColumnChunkData { - type T = ArrowColumnChunkReader; + /// Append a data-page blob to the store, recording its handle in write + /// order. + fn push(&mut self, value: Bytes) -> Result<()> { + let key = self.store.put(value)?; + self.keys.push(key); + Ok(()) + } - fn get_read(&self, start: u64) -> Result { - assert_eq!(start, 0); // Assume append_column writes all data in one-shot - Ok(ArrowColumnChunkReader( - self.data.clone().into_iter().peekable(), - )) + /// Retain a dictionary-page blob in memory (emitted first at splice). + fn push_dictionary(&mut self, value: Bytes) { + self.dictionary.push(value); } - fn get_bytes(&self, _start: u64, _length: usize) -> Result { - unimplemented!() + /// Total serialized size of the in-memory dictionary page, in bytes. + fn dictionary_len(&self) -> usize { + self.dictionary.iter().map(Bytes::len).sum() } + + /// Bytes this chunk currently holds on the heap: whatever the store keeps + /// resident (zero for a spilling backend) plus the in-memory dictionary + /// page. + fn memory_size(&self) -> usize { + self.store.memory_size() + self.dictionary_len() + } +} + +/// A streaming [`Read`] over one column chunk's buffered pages, in final file +/// order: the in-memory dictionary page (if any) first, then the data pages. +/// +/// Each data-page blob is taken back out of the [`PageStore`] *as it is +/// consumed* and released immediately afterwards, so splicing a chunk into the +/// output file never materializes more than a single page in memory at a time. +/// This is what keeps the splice phase within the memory bound for a spilling +/// backend (an in-memory store already holds the bytes, so it is unaffected). +struct StreamingColumnChunkReader { + /// Dictionary-page blobs, emitted before any data page. + dictionary: IntoIter, + store: Box, + keys: IntoIter, + /// The blob currently being drained into the output; emptied as it is read. + current: Bytes, } -/// A [`Read`] for [`ArrowColumnChunkData`] -struct ArrowColumnChunkReader(Peekable>); +impl StreamingColumnChunkReader { + fn new(data: ArrowColumnChunkData) -> Self { + Self { + dictionary: data.dictionary.into_iter(), + store: data.store, + keys: data.keys.into_iter(), + current: Bytes::new(), + } + } +} -impl Read for ArrowColumnChunkReader { +impl Read for StreamingColumnChunkReader { fn read(&mut self, out: &mut [u8]) -> std::io::Result { - let buffer = loop { - match self.0.peek_mut() { - Some(b) if b.is_empty() => { - self.0.next(); - continue; - } - Some(b) => break b, - None => return Ok(0), + // Refill from the next blob whenever the current one is drained: the + // dictionary page first, then each data page from the store. + while self.current.is_empty() { + if let Some(blob) = self.dictionary.next() { + self.current = blob; + } else if let Some(key) = self.keys.next() { + self.current = self.store.take(key).map_err(std::io::Error::other)?; + } else { + return Ok(0); } - }; + } - let len = buffer.len().min(out.len()); - let b = buffer.split_to(len); + let len = self.current.len().min(out.len()); + let b = self.current.split_to(len); out[..len].copy_from_slice(&b); Ok(len) } @@ -660,7 +810,6 @@ impl Read for ArrowColumnChunkReader { /// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows type SharedColumnChunk = Arc>; -#[derive(Default)] struct ArrowPageWriter { buffer: SharedColumnChunk, #[cfg(feature = "encryption")] @@ -668,6 +817,15 @@ struct ArrowPageWriter { } impl ArrowPageWriter { + /// Create a page writer that buffers completed pages in `store`. + fn new(store: Box) -> Self { + Self { + buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))), + #[cfg(feature = "encryption")] + page_encryptor: None, + } + } + #[cfg(feature = "encryption")] pub fn with_encryptor(mut self, page_encryptor: Option) -> Self { self.page_encryptor = page_encryptor; @@ -726,12 +884,33 @@ impl PageWriter for ArrowPageWriter { spec.bytes_written = compressed_size as u64; buf.length += compressed_size; - buf.data.push(header); - buf.data.push(data); + if spec.page_type == PageType::DICTIONARY_PAGE { + // Held in memory and emitted first at splice — see + // `ArrowColumnChunkData::dictionary`. + buf.push_dictionary(header); + buf.push_dictionary(data); + } else { + buf.push(header)?; + buf.push(data)?; + } Ok(spec) } + fn defers_dictionary_ordering(&self) -> bool { + // The Arrow chunk is buffered in full and spliced at row-group flush, so + // data pages may be accepted before the dictionary page and reordered + // then. This lets `GenericColumnWriter` stream dictionary-column data + // pages straight through instead of buffering them in memory. + true + } + + fn buffered_memory_size(&self) -> usize { + // Only what is actually resident: a spilling store reports ~0 here even + // though the chunk's bytes have all passed through it. + self.buffer.try_lock().unwrap().memory_size() + } + fn close(&mut self) -> Result<()> { Ok(()) } @@ -785,12 +964,21 @@ impl ArrowColumnChunk { &mut self.close } - /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data + /// Splices this column's buffered pages into the row group, streaming them + /// back out of the [`PageStore`] one page at a time. pub fn append_to_row_group( self, writer: &mut SerializedRowGroupWriter<'_, W>, ) -> Result<()> { - writer.append_column(&self.data, self.close) + let ArrowColumnChunk { data, close } = self; + + // The dictionary page is produced *after* the data pages on this path (so + // they can stream straight through) but must be written *first*, so move + // it ahead of the data pages in the recorded offsets before the splice. + let close = close.update_dictionary_location(data.dictionary_len())?; + + let reader = StreamingColumnChunkReader::new(data); + writer.append_column_from_read(reader, close) } } @@ -1082,6 +1270,7 @@ pub struct ArrowRowGroupWriterFactory { schema: SchemaDescPtr, arrow_schema: SchemaRef, props: WriterPropertiesPtr, + page_store_factory: Arc, #[cfg(feature = "encryption")] file_encryptor: Option>, } @@ -1098,11 +1287,23 @@ impl ArrowRowGroupWriterFactory { schema, arrow_schema, props, + page_store_factory: Arc::new(InMemoryPageStoreFactory), #[cfg(feature = "encryption")] file_encryptor: file_writer.file_encryptor(), } } + /// Set the [`PageStoreFactory`] used to allocate the buffer for each column + /// chunk, e.g. to spill completed pages to a temp file or object storage + /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`]. + pub fn with_page_store_factory( + mut self, + page_store_factory: Arc, + ) -> Self { + self.page_store_factory = page_store_factory; + self + } + fn create_row_group_writer(&self, row_group_index: usize) -> Result { let writers = self.create_column_writers(row_group_index)?; Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) @@ -1127,12 +1328,13 @@ impl ArrowRowGroupWriterFactory { #[cfg(feature = "encryption")] fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory { ArrowColumnWriterFactory::new() + .with_page_store_factory(self.page_store_factory.clone()) .with_file_encryptor(row_group_idx, self.file_encryptor.clone()) } #[cfg(not(feature = "encryption"))] fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory { - ArrowColumnWriterFactory::new() + ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone()) } } @@ -1159,6 +1361,8 @@ pub fn get_column_writers( /// Creates [`ArrowColumnWriter`] instances struct ArrowColumnWriterFactory { + /// Allocates the per-column-chunk [`PageStore`] backing each page writer. + page_store_factory: Arc, #[cfg(feature = "encryption")] row_group_index: usize, #[cfg(feature = "encryption")] @@ -1168,6 +1372,7 @@ struct ArrowColumnWriterFactory { impl ArrowColumnWriterFactory { pub fn new() -> Self { Self { + page_store_factory: Arc::new(InMemoryPageStoreFactory), #[cfg(feature = "encryption")] row_group_index: 0, #[cfg(feature = "encryption")] @@ -1175,6 +1380,15 @@ impl ArrowColumnWriterFactory { } } + /// Use `page_store_factory` to allocate the buffer for each column chunk. + pub fn with_page_store_factory( + mut self, + page_store_factory: Arc, + ) -> Self { + self.page_store_factory = page_store_factory; + self + } + #[cfg(feature = "encryption")] pub fn with_file_encryptor( mut self, @@ -1199,18 +1413,22 @@ impl ArrowColumnWriterFactory { column_index, &column_path, )?; + let args = PageStoreArgs::new(column_index, column_descriptor); + let store = self.page_store_factory.create(&args)?; Ok(Box::new( - ArrowPageWriter::default().with_encryptor(page_encryptor), + ArrowPageWriter::new(store).with_encryptor(page_encryptor), )) } #[cfg(not(feature = "encryption"))] fn create_page_writer( &self, - _column_descriptor: &ColumnDescPtr, - _column_index: usize, + column_descriptor: &ColumnDescPtr, + column_index: usize, ) -> Result> { - Ok(Box::::default()) + let args = PageStoreArgs::new(column_index, column_descriptor); + let store = self.page_store_factory.create(&args)?; + Ok(Box::new(ArrowPageWriter::new(store))) } /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the @@ -1738,6 +1956,141 @@ mod tests { statistics::Statistics, }; + /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and keeps + /// blobs in a `HashMap` — nothing like the default `Vec`. Used to + /// prove the writer relies only on the opaque-handle contract and never on + /// handles being dense `Vec` indices. Records how many blobs were stored. + #[derive(Debug, Default)] + struct RecordingPageStore { + next: u64, + blobs: HashMap, + puts: Arc, + } + + impl PageStore for RecordingPageStore { + fn put(&mut self, value: Bytes) -> Result { + // Deliberately non-sequential, never-zero handles. + let id = 100 + self.next * 7; + self.next += 1; + self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.blobs.insert(id, value); + Ok(PageKey::new(id)) + } + + fn take(&mut self, key: PageKey) -> Result { + self.blobs + .remove(&key.get()) + .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get()))) + } + } + + #[derive(Debug)] + struct RecordingPageStoreFactory { + puts: Arc, + } + + impl PageStoreFactory for RecordingPageStoreFactory { + fn create(&self, _args: &PageStoreArgs<'_>) -> Result> { + Ok(Box::new(RecordingPageStore { + puts: self.puts.clone(), + ..Default::default() + })) + } + } + + /// A custom [`PageStore`] must produce byte-identical files to the in-memory + /// default, across dictionary and non-dictionary columns and multiple row + /// groups (so multiple store instances are exercised). + #[test] + fn custom_page_store_is_byte_identical_to_default() { + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + // A low-cardinality string column to exercise the dictionary path. + Field::new("s", DataType::Utf8, true), + ])); + let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]); + let s = StringArray::from(vec![ + Some("a"), + Some("bb"), + Some("a"), + None, + Some("bb"), + Some("ccc"), + ]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap(); + + // Small row groups so multiple column chunks (hence multiple store + // instances) are produced. + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + + let write = |factory: Option>| { + let mut buffer = Vec::new(); + let mut opts = ArrowWriterOptions::new().with_properties(props.clone()); + if let Some(factory) = factory { + opts = opts.with_page_store_factory(factory); + } + let mut writer = + ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + buffer + }; + + let default_bytes = write(None); + + let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory { + puts: puts.clone(), + }))); + + assert!( + puts.load(std::sync::atomic::Ordering::Relaxed) > 0, + "custom PageStore was never written to" + ); + assert_eq!( + default_bytes, custom_bytes, + "a custom PageStore must produce byte-identical output to the default" + ); + } + + /// A dictionary-encoded column written through the deferred-ordering Arrow + /// path must round-trip correctly even with the offset index disabled, when + /// only the chunk-level dictionary/data page offsets are rewritten (there is + /// no offset index to rebuild). Spans multiple data pages so the + /// dictionary-first reordering is exercised. + #[test] + fn dictionary_column_round_trips_with_offset_index_disabled() { + let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)])); + + // Low cardinality so the column stays dictionary-encoded; enough rows to + // span several data pages within a single row group. + let values: Vec> = (0..50_000).map(|i| Some(i % 8)).collect(); + let array = Int32Array::from(values.clone()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + + let props = WriterProperties::builder() + .set_offset_index_disabled(true) + .set_data_page_row_count_limit(4096) + .build(); + let opts = ArrowWriterOptions::new().with_properties(props); + + let mut buffer = Vec::new(); + let mut writer = + ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap(); + let read: Vec = reader.collect::>().unwrap(); + let read_values: Vec> = read + .iter() + .flat_map(|b| b.column(0).as_primitive::().iter()) + .collect(); + assert_eq!(read_values, values); + } + #[test] fn arrow_writer() { // define schema diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs index 115c8dd01b80..9c7e77d29cba 100644 --- a/parquet/src/column/mod.rs +++ b/parquet/src/column/mod.rs @@ -125,5 +125,6 @@ pub(crate) mod page_encryption; #[cfg(not(feature = "encryption"))] #[path = "page_encryption_disabled.rs"] pub(crate) mod page_encryption; +pub mod page_store; pub mod reader; pub mod writer; diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 4cfc07a02883..ed80e279a03a 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -197,6 +197,15 @@ impl CompressedPage { self.compressed_page.buffer() } + /// Returns the number of heap bytes this page currently holds. + /// + /// This is the page's compressed buffer (the embedded [`Bytes`]); use it to + /// account for a buffered page's memory footprint rather than reaching for + /// `data().len()` at each call site. + pub fn memory_usage(&self) -> usize { + self.compressed_page.buffer().len() + } + /// Returns the thrift page header pub(crate) fn to_thrift_header(&self) -> Result { let uncompressed_size = self.uncompressed_size(); @@ -430,6 +439,55 @@ pub trait PageWriter: Send { /// either data page or dictionary page. fn write_page(&mut self, page: CompressedPage) -> Result; + /// **Unstable, not public API.** This is an internal protocol between + /// [`GenericColumnWriter`] and its in-crate page writers; it is hidden from + /// the rendered docs and may change or be removed without a major version + /// bump. External `PageWriter` implementations should not override it. See + /// the page-spilling cleanup tracked in + /// . + /// + /// Whether this writer resolves the final page layout itself (at flush) + /// rather than committing bytes to their final position as pages arrive. + /// + /// The dictionary page of a column chunk must be written *first*, but it is + /// not finalized until every value has been seen. A writer that commits + /// bytes live (e.g. straight to a file) therefore relies on the column + /// writer buffering the dictionary-encoded data pages in memory until the + /// dictionary page is ready — see [`GenericColumnWriter`]'s `data_pages`. + /// + /// A writer that instead buffers the whole chunk and splices it later (the + /// [`ArrowWriter`] path) can accept data pages *before* the dictionary page + /// and order them itself at flush. Returning `true` tells the column writer + /// to skip that in-memory buffering and stream dictionary-column data pages + /// straight through, bounding the column writer's memory. + /// + /// [`GenericColumnWriter`]: crate::column::writer::GenericColumnWriter + /// [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter + #[doc(hidden)] + fn defers_dictionary_ordering(&self) -> bool { + false + } + + /// **Unstable, not public API.** Companion to + /// [`defers_dictionary_ordering`](Self::defers_dictionary_ordering): an + /// internal hook for the column writer's memory accounting, hidden from the + /// rendered docs and subject to change or removal without a major version + /// bump. External `PageWriter` implementations should not override it. + /// + /// The number of bytes this writer is currently holding **in memory** for + /// pages it has been handed (i.e. completed pages not yet committed to their + /// final destination). + /// + /// Used by the column writer to report its memory footprint. The default is + /// `0`: a writer that streams pages straight to their destination retains + /// nothing. A writer that buffers pages should report what it actually holds + /// on the heap — which, when it spills to a backing store, can be far less + /// than the bytes written. + #[doc(hidden)] + fn buffered_memory_size(&self) -> usize { + 0 + } + /// Closes resources and flushes underlying sink. /// Page writer should not be used after this method is called. fn close(&mut self) -> Result<()>; diff --git a/parquet/src/column/page_store.rs b/parquet/src/column/page_store.rs new file mode 100644 index 000000000000..4f821c0e5cad --- /dev/null +++ b/parquet/src/column/page_store.rs @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable storage for completed, serialized page blobs. +//! +//! While a row group is being written the [`ArrowWriter`] must buffer every +//! column's encoded pages, because Parquet requires each column chunk to be +//! contiguous in the file while record batches arrive with all columns interleaved. +//! By default that buffer lives on the heap, so the writer's peak memory grows +//! with the row group size. A [`PageStore`] lets the buffer live somewhere else +//! — a local temp file, object storage, etc. — bounding peak write memory +//! independently of the row group size. +//! +//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter + +use std::fmt::Debug; + +use bytes::Bytes; + +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescriptor; + +/// An opaque, store-allocated handle to a blob held by a [`PageStore`]. +/// +/// Handles are allocated by the store — densely and sequentially — and are only +/// meaningful to the store that produced them. The caller treats them as opaque +/// tokens. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct PageKey(u64); + +impl PageKey { + /// Create a handle wrapping `raw`. + /// + /// A [`PageStore`] implementation calls this to mint the handle it returns + /// from [`put`](PageStore::put). The value is opaque to the caller, so a + /// store is free to use a dense counter, a packed locator, or anything else + /// it can later resolve in [`take`](PageStore::take). + pub const fn new(raw: u64) -> Self { + Self(raw) + } + + /// The raw value passed to [`new`](Self::new). + pub const fn get(self) -> u64 { + self.0 + } +} + +/// A pluggable store for completed, serialized page blobs. +/// +/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a +/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or +/// offsets. The caller keeps the handles it gets back from [`put`](Self::put) +/// and decides what they mean. +/// +/// Each store instance is owned by a single column writer and mutated by one +/// thread at a time (both methods take `&mut self`), so it needs no internal +/// synchronization — hence only `Send`, not `Sync`. +/// +/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a +/// different backend via +/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory). +pub trait PageStore: Send { + /// Store `value`, returning a handle that can later be passed to + /// [`take`](Self::take). + fn put(&mut self, value: Bytes) -> Result; + + /// Take back the blob previously stored under `key`. + /// + /// The caller takes ownership of the returned bytes and will **not** request + /// `key` again, so the store may release any resources backing it — eagerly + /// here, or when the store is dropped. + fn take(&mut self, key: PageKey) -> Result; + + /// The number of bytes this store currently holds **in memory** (resident + /// on the heap), used to report the writer's memory footprint. + /// + /// The default is `0`, which is exactly right for a backend that moves + /// every blob off-heap (a temp file, object storage): the bytes it has been + /// handed no longer occupy heap. The in-memory backend overrides this to + /// report its resident blobs. A backend that keeps a partial in-memory + /// buffer should report that buffer's size. + fn memory_size(&self) -> usize { + 0 + } +} + +/// Context for a single [`PageStoreFactory::create`] call. +/// +/// Describes the leaf column chunk the store will buffer. It is held by +/// reference for the duration of the call; a backend reads only what it needs. +/// More fields may be added in future releases without breaking existing +/// implementations — the type is constructed only by the writer, so an +/// implementer only ever receives one and calls its accessors. +pub struct PageStoreArgs<'a> { + column_index: usize, + column_descriptor: &'a ColumnDescriptor, +} + +impl<'a> PageStoreArgs<'a> { + // Constructed only by the Arrow writer; without that feature there is no caller. + #[cfg(feature = "arrow")] + pub(crate) fn new(column_index: usize, column_descriptor: &'a ColumnDescriptor) -> Self { + Self { + column_index, + column_descriptor, + } + } + + /// Index of the leaf column within the row group. + /// + /// A backend may use this to e.g. name spill files or shard across a bounded + /// pool; it carries no ordering or coordination requirement. + pub fn column_index(&self) -> usize { + self.column_index + } + + /// Descriptor for the leaf column: physical/logical type, path, and max + /// definition/repetition levels. + /// + /// Lets a backend tailor buffering to the column — for example spilling only + /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the + /// heap. + pub fn column_descriptor(&self) -> &ColumnDescriptor { + self.column_descriptor + } +} + +/// Creates a fresh [`PageStore`] for each column chunk. +/// +/// See +/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory). +pub trait PageStoreFactory: Send + Sync + Debug { + /// Create a new, empty [`PageStore`] for the leaf column described by `args`. + fn create(&self, args: &PageStoreArgs<'_>) -> Result>; +} + +/// The default [`PageStore`], holding blobs on the heap in a `Vec`. +/// +/// Peak memory grows with the row group size; use a spilling backend to bound +/// it. +#[derive(Debug, Default)] +pub struct InMemoryPageStore { + blobs: Vec, + /// Running total of resident blob bytes, kept in step with `put`/`take`. + resident: usize, +} + +impl PageStore for InMemoryPageStore { + fn put(&mut self, value: Bytes) -> Result { + let key = PageKey(self.blobs.len() as u64); + self.resident += value.len(); + self.blobs.push(value); + Ok(key) + } + + fn take(&mut self, key: PageKey) -> Result { + // Replace the slot with an empty `Bytes` so the stored blob is released + // as soon as it is taken, keeping memory bounded while the chunk is + // streamed into the output file. + let blob = self + .blobs + .get_mut(key.0 as usize) + .map(std::mem::take) + .ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?; + self.resident -= blob.len(); + Ok(blob) + } + + fn memory_size(&self) -> usize { + self.resident + } +} + +/// Factory for [`InMemoryPageStore`] — the default used by +/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter). +#[derive(Debug, Default)] +pub struct InMemoryPageStoreFactory; + +impl PageStoreFactory for InMemoryPageStoreFactory { + fn create(&self, _args: &PageStoreArgs<'_>) -> Result> { + Ok(Box::new(InMemoryPageStore::default())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn in_memory_round_trips_blobs_in_handle_order() { + let mut store = InMemoryPageStore::default(); + let k0 = store.put(Bytes::from_static(b"hello")).unwrap(); + let k1 = store.put(Bytes::from_static(b"world")).unwrap(); + assert_ne!(k0, k1); + assert_eq!(&store.take(k0).unwrap()[..], b"hello"); + assert_eq!(&store.take(k1).unwrap()[..], b"world"); + } + + #[test] + fn in_memory_take_releases_the_slot() { + let mut store = InMemoryPageStore::default(); + let k = store.put(Bytes::from_static(b"abc")).unwrap(); + assert_eq!(&store.take(k).unwrap()[..], b"abc"); + // A second take yields the emptied placeholder rather than the blob, + // confirming the bytes were released on the first take. + assert!(store.take(k).unwrap().is_empty()); + } + + #[test] + fn in_memory_invalid_key_errors() { + let mut store = InMemoryPageStore::default(); + assert!(store.take(PageKey(99)).is_err()); + } + + #[test] + fn in_memory_reports_resident_bytes() { + let mut store = InMemoryPageStore::default(); + assert_eq!(store.memory_size(), 0); + let k0 = store.put(Bytes::from_static(b"hello")).unwrap(); + let k1 = store.put(Bytes::from_static(b"!")).unwrap(); + assert_eq!(store.memory_size(), 6); + store.take(k0).unwrap(); + assert_eq!(store.memory_size(), 1); + store.take(k1).unwrap(); + assert_eq!(store.memory_size(), 0); + } + + #[test] + fn default_store_memory_size_is_zero() { + // A spilling backend that does not override `memory_size` reports 0, + // reflecting that its blobs no longer occupy the heap. + struct OffHeap; + impl PageStore for OffHeap { + fn put(&mut self, _value: Bytes) -> Result { + Ok(PageKey::new(0)) + } + fn take(&mut self, _key: PageKey) -> Result { + Ok(Bytes::new()) + } + } + assert_eq!(OffHeap.memory_size(), 0); + } +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 4e53230bbf89..bd9bd2a587c6 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -206,6 +206,37 @@ pub struct ColumnCloseResult { pub offset_index: Option, } +impl ColumnCloseResult { + /// Rewrite the page offsets for a dictionary-first on-disk layout. + /// + /// A writer that buffers the whole column chunk and splices it later (the + /// Arrow path) may accept the data pages *before* the dictionary page so the + /// data pages can stream straight through, then emit the dictionary page + /// first at splice. The offsets recorded during encoding therefore assume a + /// data-pages-first layout; call this with the serialized length of the + /// dictionary page to move it to offset 0 and shift every data page after + /// it. A `dictionary_len` of 0 (no dictionary page) leaves the result + /// unchanged. + pub fn update_dictionary_location(mut self, dictionary_len: usize) -> Result { + if dictionary_len > 0 { + self.metadata = self + .metadata + .into_builder() + .set_dictionary_page_offset(Some(0)) + .set_data_page_offset(dictionary_len as i64) + .build()?; + if let Some(offset_index) = self.offset_index.as_mut() { + let mut offset = dictionary_len as i64; + for location in offset_index.page_locations.iter_mut() { + location.offset = offset; + offset += location.compressed_page_size as i64; + } + } + } + Ok(self) + } +} + // Metrics per page #[derive(Default)] struct PageMetrics { @@ -632,7 +663,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// of the current memory usage and not the final anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { - self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() + // In-flight encoder buffers, plus any completed pages still held on the + // heap: the dictionary-column data pages buffered here (column-at-a-time + // path), plus whatever the page writer keeps resident. A page writer + // that spills completed pages off-heap reports far less than the bytes + // it was handed, so this tracks real memory rather than bytes written. + self.encoder.estimated_memory_size() + + self + .data_pages + .iter() + .map(|page| page.memory_usage()) + .sum::() + + self.page_writer.buffered_memory_size() } /// Returns total number of bytes written by this column writer so far. @@ -1271,7 +1313,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; // Check if we need to buffer data page or flush it to the sink directly. - if self.encoder.has_dictionary() { + // + // For dictionary-encoded columns the dictionary page must be written + // first, but it is not final until all values are seen, so completed + // data pages are normally buffered here until `close`. A page writer + // that defers final layout (the Arrow path) instead orders pages itself + // at flush, so we stream the data pages straight through and never let + // them accumulate in memory. + if self.encoder.has_dictionary() && !self.page_writer.defers_dictionary_ordering() { self.data_pages.push_back(compressed_page); } else { self.write_data_page(compressed_page)?; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 942013ea6238..8ec16ba36739 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -684,6 +684,30 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { pub fn append_column( &mut self, reader: &R, + close: ColumnCloseResult, + ) -> Result<()> { + // Position a reader at the start of the buffered chunk, then splice the + // bytes through the shared streaming path. + let metadata = &close.metadata; + let src_offset = metadata + .dictionary_page_offset() + .unwrap_or_else(|| metadata.data_page_offset()); + let read = reader.get_read(src_offset as _)?; + self.append_column_from_read(read, close) + } + + /// Splice an already-encoded column chunk into the row group, reading its + /// bytes sequentially from `read`. + /// + /// `read` must be positioned at the start of the chunk (the dictionary page + /// if present, otherwise the first data page — i.e. `src_offset` below) and + /// yield exactly the chunk's compressed bytes. Unlike [`Self::append_column`] + /// this consumes an owned [`Read`], which lets the caller stream the bytes + /// back from a [`PageStore`](crate::column::page_store::PageStore) one page + /// at a time without materializing the whole chunk in memory. + pub(crate) fn append_column_from_read( + &mut self, + read: R, mut close: ColumnCloseResult, ) -> Result<()> { self.assert_previous_writer_closed()?; @@ -707,7 +731,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { let src_length = metadata.compressed_size(); let write_offset = self.buf.bytes_written(); - let mut read = reader.get_read(src_offset as _)?.take(src_length as _); + let mut read = read.take(src_length as _); let write_length = std::io::copy(&mut read, &mut self.buf)?; if src_length as u64 != write_length { diff --git a/parquet/tests/arrow_writer.rs b/parquet/tests/arrow_writer.rs index 020b4c6267e0..e4cc10100035 100644 --- a/parquet/tests/arrow_writer.rs +++ b/parquet/tests/arrow_writer.rs @@ -17,13 +17,22 @@ //! Tests for [`ArrowWriter`] -use arrow::array::Float64Array; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; +use std::alloc::{GlobalAlloc, Layout, System}; +use std::cell::Cell; +use std::fs::File; +use std::io::{Read as _, Seek, SeekFrom, Write as _}; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BinaryArray, Float64Array, Int32Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use bytes::Bytes; use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_writer::{ + ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory, +}; use parquet::basic::Encoding; +use parquet::errors::Result; use parquet::file::properties::WriterProperties; -use std::sync::Arc; #[test] #[should_panic( @@ -48,3 +57,329 @@ fn test_delta_bit_pack_type() { let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), Some(props)).unwrap(); let _ = writer.write(&record_batch); } + +// --------------------------------------------------------------------------- +// Heap-memory regression test for the writer's page buffering. +// +// This proves the headline invariant of the pluggable [`PageStore`]: while a +// row group is being written, the heap used to buffer completed pages grows +// with the row group size for the default in-memory store, but stays bounded +// (≈ a few pages per leaf column) once a spilling backend is plugged in. +// +// Peak heap is measured with a thread-local tracking allocator (the same +// pattern used by `parquet/benches/arrow_reader_peak_memory.rs`), so the test +// needs no external profiling dependency. Tracking is thread-local, so the +// measured peak reflects only allocations made on the measuring thread; the +// default `ArrowWriter` is single-threaded, so the writer's buffering all lands +// there. Each measurement resets the peak to the current live baseline and +// reports the delta, so the threads of unrelated tests in this binary do not +// perturb it. +// +// [`PageStore`]: parquet::arrow::arrow_writer::PageStore +// --------------------------------------------------------------------------- + +thread_local! { + static LIVE_BYTES: Cell = const { Cell::new(0) }; + static PEAK_BYTES: Cell = const { Cell::new(0) }; +} + +struct TrackingAllocator { + inner: System, +} + +#[global_allocator] +static GLOBAL: TrackingAllocator = TrackingAllocator { inner: System }; + +fn add_live_bytes(size: usize) { + LIVE_BYTES.with(|live| { + let new = live.get().saturating_add(size); + live.set(new); + PEAK_BYTES.with(|peak| { + if new > peak.get() { + peak.set(new); + } + }); + }); +} + +fn subtract_live_bytes(size: usize) { + LIVE_BYTES.with(|live| { + live.set(live.get().saturating_sub(size)); + }); +} + +#[allow(unsafe_code)] +unsafe impl GlobalAlloc for TrackingAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let ptr = unsafe { self.inner.alloc(layout) }; + if !ptr.is_null() { + add_live_bytes(layout.size()); + } + ptr + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + subtract_live_bytes(layout.size()); + unsafe { self.inner.dealloc(ptr, layout) }; + } + + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let new_ptr = unsafe { self.inner.realloc(ptr, layout, new_size) }; + if !new_ptr.is_null() { + let old_size = layout.size(); + if new_size > old_size { + add_live_bytes(new_size - old_size); + } else { + subtract_live_bytes(old_size - new_size); + } + } + new_ptr + } +} + +/// Run `f` and return the peak *additional* live heap (bytes) observed on this +/// thread during it — the delta from the live heap when `f` began. +fn peak_heap_bytes(f: impl FnOnce()) -> usize { + let start = LIVE_BYTES.with(Cell::get); + // Reset the peak to the window's baseline so prior allocations don't count. + PEAK_BYTES.with(|peak| peak.set(start)); + f(); + PEAK_BYTES.with(Cell::get).saturating_sub(start) +} + +/// Width of each value in the one "fat" column, in bytes. +const FAT_VALUE_LEN: usize = 4096; +/// Rows per input batch fed to the writer. Kept small so each batch is dropped +/// promptly — only the writer's *buffering* should accumulate, not the input. +const ROWS_PER_BATCH: usize = 64; +/// Number of batches, all funnelled into a single large row group. +const NUM_BATCHES: usize = 64; +/// Total bytes of fat-column payload written (≈ 16 MiB). +const TOTAL_FAT_BYTES: usize = FAT_VALUE_LEN * ROWS_PER_BATCH * NUM_BATCHES; + +/// A wide schema: one fat, high-cardinality binary column (the spill target) +/// plus several tiny integer columns. +fn skewed_schema() -> SchemaRef { + let mut fields = vec![Field::new("fat", DataType::Binary, false)]; + for i in 0..8 { + fields.push(Field::new(format!("small_{i}"), DataType::Int32, false)); + } + Arc::new(Schema::new(fields)) +} + +/// Build one batch of `ROWS_PER_BATCH` rows. The fat column holds unique, +/// high-entropy values (so they neither dictionary-encode nor compress away), +/// derived deterministically from `batch_index`. +fn make_batch(schema: &SchemaRef, batch_index: usize) -> RecordBatch { + let mut fat: Vec = vec![0u8; FAT_VALUE_LEN * ROWS_PER_BATCH]; + // A cheap xorshift fill keyed by the batch index → distinct, incompressible. + let mut state = (batch_index as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) | 1; + for byte in fat.iter_mut() { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + *byte = (state >> 24) as u8; + } + let offsets: Vec = (0..=ROWS_PER_BATCH) + .map(|i| (i * FAT_VALUE_LEN) as i32) + .collect(); + let fat_array = BinaryArray::try_new( + arrow::buffer::OffsetBuffer::new(offsets.into()), + arrow::buffer::Buffer::from_vec(fat), + None, + ) + .unwrap(); + + let mut columns: Vec = vec![Arc::new(fat_array)]; + for c in 0..8 { + let vals: Vec = (0..ROWS_PER_BATCH) + .map(|r| (batch_index * ROWS_PER_BATCH + r + c) as i32) + .collect(); + columns.push(Arc::new(Int32Array::from(vals))); + } + RecordBatch::try_new(schema.clone(), columns).unwrap() +} + +/// Writer properties forcing the whole dataset into a single, uncompressed row +/// group (so the page buffer is the only thing that grows). +fn single_row_group_props() -> WriterProperties { + WriterProperties::builder() + .set_compression(parquet::basic::Compression::UNCOMPRESSED) + // One row group for everything: never auto-flush on row count. + .set_max_row_group_row_count(Some(ROWS_PER_BATCH * NUM_BATCHES * 2)) + .build() +} + +/// Write the full skewed dataset with the given writer options, feeding small +/// batches (each dropped immediately) into one row group. +/// +/// The output is sent to [`io::sink`] so the produced file bytes never live on +/// the heap — the measured peak then reflects only the writer's internal page +/// *buffering*, which is exactly what a [`PageStore`] governs. +fn write_skewed_dataset(options: ArrowWriterOptions) { + let schema = skewed_schema(); + let mut writer = + ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(), options).unwrap(); + for b in 0..NUM_BATCHES { + let batch = make_batch(&schema, b); + writer.write(&batch).unwrap(); + // `batch` dropped here — only the writer's internal buffering persists. + } + writer.close().unwrap(); +} + +/// A spilling [`PageStore`]: one temp file per column chunk. `put` appends the +/// blob and records its `(offset, len)`; `take` seeks and reads it back. The +/// file is unlinked on creation (via [`tempfile::tempfile`]) so it is cleaned up +/// when the store is dropped. This is the canonical "spill completed pages off +/// the heap" backend the design targets. +struct TempFilePageStore { + file: File, + end: u64, + locs: Vec<(u64, usize)>, +} + +impl TempFilePageStore { + fn new() -> Result { + Ok(Self { + file: tempfile::tempfile()?, + end: 0, + locs: Vec::new(), + }) + } +} + +impl PageStore for TempFilePageStore { + fn put(&mut self, value: Bytes) -> Result { + // Always append at the logical end (a prior `take` may have moved the + // OS file cursor). + self.file.seek(SeekFrom::Start(self.end))?; + self.file.write_all(&value)?; + let key = PageKey::new(self.locs.len() as u64); + self.locs.push((self.end, value.len())); + self.end += value.len() as u64; + Ok(key) + } + + fn take(&mut self, key: PageKey) -> Result { + let (offset, len) = self.locs[key.get() as usize]; + let mut buf = vec![0u8; len]; + self.file.seek(SeekFrom::Start(offset))?; + self.file.read_exact(&mut buf)?; + Ok(Bytes::from(buf)) + } +} + +#[derive(Debug, Default)] +struct TempFilePageStoreFactory; + +impl PageStoreFactory for TempFilePageStoreFactory { + fn create(&self, _args: &PageStoreArgs<'_>) -> Result> { + Ok(Box::new(TempFilePageStore::new()?)) + } +} + +/// Rows per batch / batches for the dictionary-column scenario (~4.2M rows). +const DICT_ROWS_PER_BATCH: usize = 8192; +const DICT_NUM_BATCHES: usize = 512; + +/// Write a single, low-cardinality (16 distinct values), high-row-count column +/// as one row group. Such a column stays dictionary-encoded, so its completed +/// data pages would historically pile up in `GenericColumnWriter` until close — +/// the second accumulation point that plain page-buffer spilling does not reach. +fn write_dict_dataset(options: ArrowWriterOptions) { + let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, false)])); + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::UNCOMPRESSED) + .set_max_row_group_row_count(Some(DICT_ROWS_PER_BATCH * DICT_NUM_BATCHES * 2)) + .build(); + let options = options.with_properties(props); + let mut writer = + ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(), options).unwrap(); + for b in 0..DICT_NUM_BATCHES { + let vals: Vec = (0..DICT_ROWS_PER_BATCH) + .map(|r| ((b + r) % 16) as i32) + .collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vals))]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); +} + +/// All measurements run in one function so they execute sequentially on a single +/// thread — the tracking allocator is thread-local, so running them as separate +/// parallel tests would each see only their own thread's allocations (which is +/// fine), but keeping them together also keeps the in-memory/spill comparison on +/// one consistent baseline. +#[test] +fn page_store_bounds_write_memory() { + let props = single_row_group_props(); + + // Baseline: the default in-memory store buffers the whole row group, so peak + // heap is at least the size of the buffered column data. + let in_memory_peak = peak_heap_bytes(|| { + let opts = ArrowWriterOptions::new().with_properties(props.clone()); + write_skewed_dataset(opts); + }); + + // Spilling: the temp-file store keeps completed pages off the heap, so peak + // heap stays bounded by the in-flight encoder/dictionary buffers plus a page + // or two in flight — independent of the row group size. + let spill_peak = peak_heap_bytes(|| { + let opts = ArrowWriterOptions::new() + .with_properties(props.clone()) + .with_page_store_factory(Arc::new(TempFilePageStoreFactory)); + write_skewed_dataset(opts); + }); + + eprintln!( + "peak heap — in-memory: {:.1} MiB, temp-file spill: {:.1} MiB (total fat payload {:.1} MiB)", + in_memory_peak as f64 / (1024.0 * 1024.0), + spill_peak as f64 / (1024.0 * 1024.0), + TOTAL_FAT_BYTES as f64 / (1024.0 * 1024.0), + ); + + // The in-memory store must hold most of the ~16 MiB of buffered data. + let in_memory_floor = TOTAL_FAT_BYTES * 3 / 4; + assert!( + in_memory_peak >= in_memory_floor, + "expected in-memory peak >= {in_memory_floor} bytes, got {in_memory_peak}" + ); + + // The spilling store must stay near the per-column bound — roughly + // (data_page_size + dict_page_size) per leaf column, ~2 MiB × 9 columns — + // and far below the in-memory baseline. We assert a generous 8 MiB ceiling + // (well under the ~16 MiB row group) to stay robust across platforms. + const SPILL_CEILING: usize = 8 * 1024 * 1024; + assert!( + spill_peak < SPILL_CEILING, + "expected spilling peak < {SPILL_CEILING} bytes (bounded by page/dict size × columns), \ + got {spill_peak}" + ); + assert!( + spill_peak * 2 < in_memory_peak, + "expected spilling peak ({spill_peak}) to be far below the in-memory baseline \ + ({in_memory_peak})" + ); + + // Dictionary-encoded column: completed data pages reach the page writer (and + // thus the store) as they are produced, so spilling bounds them too. + let dict_in_memory = peak_heap_bytes(|| write_dict_dataset(ArrowWriterOptions::new())); + let dict_spill = peak_heap_bytes(|| { + write_dict_dataset( + ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory)), + ) + }); + eprintln!( + "dict column ({} rows) peak heap — in-memory: {:.2} MiB, temp-file spill: {:.2} MiB", + DICT_ROWS_PER_BATCH * DICT_NUM_BATCHES, + dict_in_memory as f64 / (1024.0 * 1024.0), + dict_spill as f64 / (1024.0 * 1024.0), + ); + assert!( + dict_spill * 2 < dict_in_memory, + "expected dict-column spilling peak ({dict_spill}) to be far below the in-memory \ + baseline ({dict_in_memory}) — dictionary data pages should spill, not accumulate" + ); +}