diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 46e2dd7739e0..a881d41c7d8d 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -70,6 +70,55 @@ pub struct IpcWriteOptions { dictionary_handling: DictionaryHandling, } +/// A single buffer segment ready to be written to the output stream. +/// +/// For the uncompressed path the original Arc-backed [`Buffer`] is stored +/// directly (zero copy). For the compressed path the compressed bytes are +/// owned by a scratch `Vec`. +enum EncodedBuffer { + /// Uncompressed: Arc-backed reference to the original array buffer. + Raw(Buffer), + /// Compressed: owned scratch bytes produced by the codec. + Compressed(Vec), +} + +impl EncodedBuffer { + fn as_slice(&self) -> &[u8] { + match self { + EncodedBuffer::Raw(b) => b.as_slice(), + EncodedBuffer::Compressed(v) => v.as_slice(), + } + } + + fn len(&self) -> usize { + match self { + EncodedBuffer::Raw(b) => b.len(), + EncodedBuffer::Compressed(v) => v.len(), + } + } +} +/// Destination for per-buffer encoded output produced by [`write_array_data`]. +enum BufferSink<'a> { + /// Serialize buffer bytes (with padding) into a contiguous byte vec. + Write(&'a mut Vec), + /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming. + Collect(&'a mut Vec), +} + +/// Per-message sizes produced by [`IpcDataGenerator::write_direct`]. +/// +/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for +/// random-access reads. +struct IpcWriteMetadata { + /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written + /// before the record batch. + dictionary_block_sizes: Vec<(usize, usize)>, + /// Flatbuffer header size including continuation prefix and alignment padding. + padded_header_len: usize, + /// Total length of the record-batch body including trailing alignment padding. + body_len: usize, +} + impl IpcWriteOptions { /// Configures compression when writing IPC files. /// @@ -474,16 +523,44 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { + let encoded_dictionaries = self.encode_all_dicts( + batch, + dictionary_tracker, + write_options, + compression_context, + )?; + let mut arrow_data = Vec::new(); + let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( + batch, + write_options, + compression_context, + &mut BufferSink::Write(&mut arrow_data), + )?; + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + Ok(( + encoded_dictionaries, + EncodedData { + ipc_message, + arrow_data, + }, + )) + } + + /// Encode dictionary batches for all columns in `batch`. + fn encode_all_dicts( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + ) -> Result, ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); - let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); - for (i, field) in schema.fields().iter().enumerate() { - let column = batch.column(i); self.encode_dictionaries( field, - column, + batch.column(i), &mut encoded_dictionaries, dictionary_tracker, write_options, @@ -491,10 +568,71 @@ impl IpcDataGenerator { compression_context, )?; } + Ok(encoded_dictionaries) + } - let encoded_message = - self.record_batch_to_bytes(batch, write_options, compression_context)?; - Ok((encoded_dictionaries, encoded_message)) + /// Write dictionary batches and the record batch directly to `writer`, skipping the + /// intermediate body `Vec` allocations + /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks. + fn write_direct( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + writer: &mut W, + ) -> Result { + let encoded_dictionaries = self.encode_all_dicts( + batch, + dictionary_tracker, + write_options, + compression_context, + )?; + + let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len()); + for dict in encoded_dictionaries { + dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?); + } + + let capacity = batch + .columns() + .iter() + .map(|a| estimate_encoded_buffer_count(a.data_type())) + .sum(); + let mut encoded_buffers: Vec = Vec::with_capacity(capacity); + let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes( + batch, + write_options, + compression_context, + &mut BufferSink::Collect(&mut encoded_buffers), + )?; + + let alignment = write_options.alignment; + let a = usize::from(alignment - 1); + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; + let aligned_size = (ipc_message.len() + prefix_size + a) & !a; + write_continuation( + &mut *writer, + write_options, + (aligned_size - prefix_size) as i32, + )?; + writer.write_all(&ipc_message)?; + writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?; + for enc in &encoded_buffers { + writer.write_all(enc.as_slice())?; + writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; + } + writer.write_all(&PADDING[..tail_pad])?; + + Ok(IpcWriteMetadata { + dictionary_block_sizes, + padded_header_len: aligned_size, + body_len, + }) } /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch). @@ -515,22 +653,20 @@ impl IpcDataGenerator { ) } - /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the - /// other for the batch's data + /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the + /// serialised buffer data. fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, - ) -> Result { + sink: &mut BufferSink<'_>, + ) -> Result<(Vec, usize, usize), ArrowError> { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; - let mut offset = 0; - // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -543,14 +679,16 @@ impl IpcDataGenerator { let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; + let alignment = write_options.alignment; let mut variadic_buffer_counts = vec![]; + let mut offset = 0i64; for array in batch.columns() { let array_data = array.to_data(); offset = write_array_data( &array_data, &mut buffers, - &mut arrow_data, + sink, &mut nodes, offset, array.len(), @@ -559,15 +697,12 @@ impl IpcDataGenerator { compression_context, write_options, )?; - append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } - // pad the tail of body data - let len = arrow_data.len(); - let pad_len = pad_to_alignment(write_options.alignment, len); - arrow_data.extend_from_slice(&PADDING[..pad_len]); - // write data + let tail_pad = pad_to_alignment(alignment, offset as usize); + let body_len = offset as usize + tail_pad; + let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { @@ -584,27 +719,21 @@ impl IpcDataGenerator { if let Some(c) = compression { batch_builder.add_compression(c); } - if let Some(v) = variadic_buffer { batch_builder.add_variadicBufferCounts(v); } - let b = batch_builder.finish(); - b.as_union_value() + batch_builder.finish().as_union_value() }; // create an crate::Message let mut message = crate::MessageBuilder::new(&mut fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); - message.add_bodyLength(arrow_data.len() as i64); + message.add_bodyLength(body_len as i64); message.add_header(root); let root = message.finish(); fbb.finish(root, None); - let finished_data = fbb.finished_data(); - Ok(EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, - }) + Ok((fbb.finished_data().to_vec(), body_len, tail_pad)) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the @@ -637,10 +766,12 @@ impl IpcDataGenerator { .map(|batch_compression_type| batch_compression_type.try_into()) .transpose()?; - write_array_data( + let alignment = write_options.alignment; + let mut sink = BufferSink::Write(&mut arrow_data); + let offset = write_array_data( array_data, &mut buffers, - &mut arrow_data, + &mut sink, &mut nodes, 0, array_data.len(), @@ -654,9 +785,9 @@ impl IpcDataGenerator { append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); // pad the tail of body data - let len = arrow_data.len(); - let pad_len = pad_to_alignment(write_options.alignment, len); - arrow_data.extend_from_slice(&PADDING[..pad_len]); + let tail_pad = pad_to_alignment(alignment, offset as usize); + let body_len = offset as usize + tail_pad; + arrow_data.extend_from_slice(&PADDING[..tail_pad]); // write data let buffers = fbb.create_vector(&buffers); @@ -693,7 +824,7 @@ impl IpcDataGenerator { let mut message_builder = crate::MessageBuilder::new(&mut fbb); message_builder.add_version(write_options.metadata_version); message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(arrow_data.len() as i64); + message_builder.add_bodyLength(body_len as i64); message_builder.add_header(root); message_builder.finish() }; @@ -1165,32 +1296,32 @@ impl FileWriter { )); } - let (encoded_dictionaries, encoded_message) = self.data_gen.encode( + let meta = self.data_gen.write_direct( batch, &mut self.dictionary_tracker, &self.write_options, &mut self.compression_context, + &mut self.writer, )?; - for encoded_dictionary in encoded_dictionaries { - let (meta, data) = - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; - - let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64); + for (header_len, body_len) in meta.dictionary_block_sizes { + let block = crate::Block::new( + self.block_offsets as i64, + header_len as i32, + body_len as i64, + ); self.dictionary_blocks.push(block); - self.block_offsets += meta + data; + self.block_offsets += header_len + body_len; } - let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?; - // add a record block for the footer let block = crate::Block::new( self.block_offsets as i64, - meta as i32, // TODO: is this still applicable? - data as i64, + meta.padded_header_len as i32, + meta.body_len as i64, ); self.record_blocks.push(block); - self.block_offsets += meta + data; + self.block_offsets += meta.padded_header_len + meta.body_len; Ok(()) } @@ -1440,21 +1571,13 @@ impl StreamWriter { )); } - let (encoded_dictionaries, encoded_message) = self - .data_gen - .encode( - batch, - &mut self.dictionary_tracker, - &self.write_options, - &mut self.compression_context, - ) - .expect("StreamWriter is configured to not error on dictionary replacement"); - - for encoded_dictionary in encoded_dictionaries { - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; - } - - write_message(&mut self.writer, encoded_message, &self.write_options)?; + self.data_gen.write_direct( + batch, + &mut self.dictionary_tracker, + &self.write_options, + &mut self.compression_context, + &mut self.writer, + )?; Ok(()) } @@ -1786,7 +1909,7 @@ fn get_list_view_array_buffers( /// the array's offset and length. This helps reduce the encoded size of sliced /// arrays /// -fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] { +fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer { let buffer = &array_data.buffers()[0]; let layout = layout(array_data.data_type()); let spec = &layout.buffers[0]; @@ -1796,9 +1919,9 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] { if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) { let byte_offset = array_data.offset() * byte_width; let buffer_length = min(min_length, buffer.len() - byte_offset); - &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)] + buffer.slice_with_length(byte_offset, buffer_length) } else { - buffer.as_slice() + buffer.clone() } } @@ -1807,7 +1930,7 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] { fn write_array_data( array_data: &ArrayData, buffers: &mut Vec, - arrow_data: &mut Vec, + sink: &mut BufferSink<'_>, nodes: &mut Vec, offset: i64, num_rows: usize, @@ -1837,10 +1960,10 @@ fn write_array_data( Some(buffer) => buffer.inner().sliced(), }; - offset = write_buffer( - null_buffer.as_slice(), + offset = encode_sink_buffer( + null_buffer, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1852,10 +1975,10 @@ fn write_array_data( if matches!(data_type, DataType::Binary | DataType::Utf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { - offset = write_buffer( - buffer.as_slice(), + offset = encode_sink_buffer( + buffer, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1870,10 +1993,10 @@ fn write_array_data( // If users wants to "compact" the arrays prior to sending them over IPC, // they should consider the gc API suggested in #5513 let views = get_or_truncate_buffer(array_data); - offset = write_buffer( + offset = encode_sink_buffer( views, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1881,10 +2004,10 @@ fn write_array_data( )?; for buffer in array_data.buffers().iter().skip(1) { - offset = write_buffer( - buffer.as_slice(), + offset = encode_sink_buffer( + buffer.clone(), buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1894,10 +2017,10 @@ fn write_array_data( } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { - offset = write_buffer( - buffer.as_slice(), + offset = encode_sink_buffer( + buffer, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1915,10 +2038,10 @@ fn write_array_data( assert_eq!(array_data.buffers().len(), 1); let buffer = get_or_truncate_buffer(array_data); - offset = write_buffer( + offset = encode_sink_buffer( buffer, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1931,10 +2054,10 @@ fn write_array_data( let buffer = &array_data.buffers()[0]; let buffer = buffer.bit_slice(array_data.offset(), array_data.len()); - offset = write_buffer( - &buffer, + offset = encode_sink_buffer( + buffer, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1954,10 +2077,10 @@ fn write_array_data( DataType::LargeList(_) => get_list_array_buffers::(array_data), _ => unreachable!(), }; - offset = write_buffer( - offsets.as_slice(), + offset = encode_sink_buffer( + offsets, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -1966,7 +2089,7 @@ fn write_array_data( offset = write_array_data( &sliced_child_data, buffers, - arrow_data, + sink, nodes, offset, sliced_child_data.len(), @@ -1989,20 +2112,19 @@ fn write_array_data( _ => unreachable!(), }; - offset = write_buffer( - offsets.as_slice(), + offset = encode_sink_buffer( + offsets, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, write_options.alignment, )?; - - offset = write_buffer( - sizes.as_slice(), + offset = encode_sink_buffer( + sizes, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -2012,7 +2134,7 @@ fn write_array_data( offset = write_array_data( &child_data, buffers, - arrow_data, + sink, nodes, offset, child_data.len(), @@ -2033,7 +2155,7 @@ fn write_array_data( offset = write_array_data( &child_data, buffers, - arrow_data, + sink, nodes, offset, child_data.len(), @@ -2045,10 +2167,10 @@ fn write_array_data( return Ok(offset); } else { for buffer in array_data.buffers() { - offset = write_buffer( - buffer, + offset = encode_sink_buffer( + buffer.clone(), buffers, - arrow_data, + sink, offset, compression_codec, compression_context, @@ -2068,7 +2190,7 @@ fn write_array_data( offset = write_array_data( data_ref, buffers, - arrow_data, + sink, nodes, offset, data_ref.len(), @@ -2086,7 +2208,7 @@ fn write_array_data( offset = write_array_data( data_ref, buffers, - arrow_data, + sink, nodes, offset, data_ref.len(), @@ -2101,50 +2223,104 @@ fn write_array_data( Ok(offset) } -/// Write a buffer into `arrow_data`, a vector of bytes, and adds its -/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data` -/// -/// -/// From -/// Each constituent buffer is first compressed with the indicated -/// compressor, and then written with the uncompressed length in the first 8 -/// bytes as a 64-bit little-endian signed integer followed by the compressed -/// buffer bytes (and then padding as required by the protocol). The -/// uncompressed length may be set to -1 to indicate that the data that -/// follows is not compressed, which can be useful for cases where -/// compression does not yield appreciable savings. -fn write_buffer( - buffer: &[u8], // input - buffers: &mut Vec, // output buffer descriptors - arrow_data: &mut Vec, // output stream - offset: i64, // current output stream offset +fn encode_sink_buffer( + buffer: Buffer, + buffers: &mut Vec, + sink: &mut BufferSink<'_>, + offset: i64, compression_codec: Option, compression_context: &mut CompressionContext, alignment: u8, ) -> Result { - let len: i64 = match compression_codec { - Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?, + let (encoded, len) = match compression_codec { None => { - arrow_data.extend_from_slice(buffer); - buffer.len() + let len = buffer.len() as i64; + (EncodedBuffer::Raw(buffer), len) } - } - .try_into() - .map_err(|e| { - ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}")) - })?; + Some(codec) => { + let mut scratch = Vec::new(); + let written = + codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?; + let len = i64::try_from(written) + .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?; + (EncodedBuffer::Compressed(scratch), len) + } + }; - // make new index entry - buffers.push(crate::Buffer::new(offset, len)); - // padding and make offset aligned let pad_len = pad_to_alignment(alignment, len as usize); - arrow_data.extend_from_slice(&PADDING[..pad_len]); + match sink { + BufferSink::Write(arrow_data) => { + match &encoded { + EncodedBuffer::Raw(b) => arrow_data.extend_from_slice(b.as_slice()), + EncodedBuffer::Compressed(v) => arrow_data.extend_from_slice(v), + } + arrow_data.extend_from_slice(&PADDING[..pad_len]); + } + BufferSink::Collect(encoded_buffers) => encoded_buffers.push(encoded), + } - Ok(offset + len + (pad_len as i64)) + buffers.push(crate::Buffer::new(offset, len)); + Ok(offset + len + pad_len as i64) } const PADDING: [u8; 64] = [0; 64]; +/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`] +/// will produce for a column of the given type. +/// +/// Based on the Arrow IPC buffer layout +/// (): +#[inline] +fn estimate_encoded_buffer_count(dt: &DataType) -> usize { + match dt { + DataType::Null => 0, + + DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3, + + DataType::BinaryView | DataType::Utf8View => 3, + + DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { + 2 + estimate_encoded_buffer_count(f.data_type()) + } + + DataType::ListView(f) | DataType::LargeListView(f) => { + 3 + estimate_encoded_buffer_count(f.data_type()) + } + + DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()), + + DataType::Struct(fields) => { + 1 + fields + .iter() + .map(|f| estimate_encoded_buffer_count(f.data_type())) + .sum::() + } + + // Dictionary indices only; dictionary body is a separate IPC message. + DataType::Dictionary(_, _) => 2, + + DataType::Union(fields, UnionMode::Sparse) => { + 1 + fields + .iter() + .map(|(_, f)| estimate_encoded_buffer_count(f.data_type())) + .sum::() + } + DataType::Union(fields, UnionMode::Dense) => { + 2 + fields + .iter() + .map(|(_, f)| estimate_encoded_buffer_count(f.data_type())) + .sum::() + } + + DataType::RunEndEncoded(run_ends, values) => { + estimate_encoded_buffer_count(run_ends.data_type()) + + estimate_encoded_buffer_count(values.data_type()) + } + // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values. + _ => 2, + } +} + /// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary #[inline] fn pad_to_alignment(alignment: u8, len: usize) -> usize {