From c3c87a8774d8c22cba2508da8dcf2c5d51b0cab3 Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 1 Jun 2026 14:51:07 -0400 Subject: [PATCH 01/15] remove repeated buffer copies in encoding pipeline --- arrow-ipc/src/writer.rs | 382 +++++++++++++++++++++++++++++----------- 1 file changed, 282 insertions(+), 100 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 46e2dd7739e0..ed55c60145e9 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -70,6 +70,33 @@ pub struct IpcWriteOptions { dictionary_handling: DictionaryHandling, } +/// A single buffer segment ready to be written to the output stream. +/// +/// Used by [`IpcDataGenerator::write_batch_direct`] to avoid staging all buffer +/// data through a flat `arrow_data: Vec` accumulator. +enum EncodedBuffer { + /// Uncompressed : an Arc-backed reference to the original array buffer. + Raw(Buffer), + /// Compressed : owned scratch bytes produced by the codec. + Compressed(Vec), +} + +impl EncodedBuffer { + fn as_slice(&self) -> &[u8] { + match self { + // Cloning is a reference-count bump + EncodedBuffer::Raw(b) => b.as_slice(), + EncodedBuffer::Compressed(v) => v.as_slice(), + } + } + + fn len(&self) -> usize { + match self { + EncodedBuffer::Raw(b) => b.len(), + EncodedBuffer::Compressed(v) => v.len(), + } + } +} impl IpcWriteOptions { /// Configures compression when writing IPC files. /// @@ -527,8 +554,8 @@ impl IpcDataGenerator { let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; - let mut offset = 0; + let mut encoded_buffers: Vec = vec![]; + let mut offset = 0i64; // get the type of compression let batch_compression_type = write_options.batch_compression_type; @@ -550,7 +577,7 @@ impl IpcDataGenerator { offset = write_array_data( &array_data, &mut buffers, - &mut arrow_data, + &mut encoded_buffers, &mut nodes, offset, array.len(), @@ -563,9 +590,9 @@ impl IpcDataGenerator { append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } // pad the tail of body data - let len = arrow_data.len(); - let pad_len = pad_to_alignment(write_options.alignment, len); - arrow_data.extend_from_slice(&PADDING[..pad_len]); + + let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); + let body_len = offset as usize + tail_pad; // write data let buffers = fbb.create_vector(&buffers); @@ -595,12 +622,23 @@ impl IpcDataGenerator { let mut message = crate::MessageBuilder::new(&mut fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); - message.add_bodyLength(arrow_data.len() as i64); + message.add_bodyLength(body_len as i64); message.add_header(root); let root = message.finish(); fbb.finish(root, None); let finished_data = fbb.finished_data(); + // EncodedData.arrow_data is Vec, so we must flatten here. + // write_batch_direct skips this by streaming EncodedBuffer segments directly to the writer. + let mut arrow_data: Vec = Vec::with_capacity(body_len); + for encoded in &encoded_buffers { + arrow_data.extend_from_slice(encoded.as_slice()); + arrow_data.extend_from_slice( + &PADDING[..pad_to_alignment(write_options.alignment, encoded.len())], + ); + } + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, @@ -621,9 +659,8 @@ impl IpcDataGenerator { let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; + let mut encoded_buffers: Vec = vec![]; - // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -637,10 +674,10 @@ impl IpcDataGenerator { .map(|batch_compression_type| batch_compression_type.try_into()) .transpose()?; - write_array_data( + let offset = write_array_data( array_data, &mut buffers, - &mut arrow_data, + &mut encoded_buffers, &mut nodes, 0, array_data.len(), @@ -654,9 +691,8 @@ impl IpcDataGenerator { append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); // pad the tail of body data - let len = arrow_data.len(); - let pad_len = pad_to_alignment(write_options.alignment, len); - arrow_data.extend_from_slice(&PADDING[..pad_len]); + let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); + let body_len = offset as usize + tail_pad; // write data let buffers = fbb.create_vector(&buffers); @@ -693,7 +729,7 @@ impl IpcDataGenerator { let mut message_builder = crate::MessageBuilder::new(&mut fbb); message_builder.add_version(write_options.metadata_version); message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(arrow_data.len() as i64); + message_builder.add_bodyLength(body_len as i64); message_builder.add_header(root); message_builder.finish() }; @@ -701,11 +737,173 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); + //same as [Self::record_batch_to_bytes] + let mut arrow_data: Vec = Vec::with_capacity(body_len); + for encoded in &encoded_buffers { + arrow_data.extend_from_slice(encoded.as_slice()); + arrow_data.extend_from_slice( + &PADDING[..pad_to_alignment(write_options.alignment, encoded.len())], + ); + } + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, }) } + + /// Write dictionaries and a record batch directly to `writer`, skipping the + /// intermediate `arrow_data: Vec` accumulator used by [`Self::record_batch_to_bytes`]. + /// + /// For the uncompressed path each array buffer is held as an Arc-backed slice and + /// written straight to `writer` — one copy instead of two. For the compressed path + /// each buffer is compressed into a per-buffer scratch `Vec` and written from + /// there, eliminating the extra copy that `write_buffer` -> `arrow_data` -> + /// `write_body_buffers` would otherwise incur. + /// + /// Returns `(dict_sizes, batch_sizes)` where each element is + /// `(ipc_metadata_bytes, body_bytes)` + fn write_batch_direct( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + writer: &mut W, + ) -> Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError> { + let schema = batch.schema(); + let mut encoded_dictionaries = Vec::new(); + let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); + for (i, field) in schema.fields().iter().enumerate() { + self.encode_dictionaries( + field, + batch.column(i), + &mut encoded_dictionaries, + dictionary_tracker, + write_options, + &mut dict_id, + compression_context, + )?; + } + let mut dict_sizes = Vec::with_capacity(encoded_dictionaries.len()); + for dict in encoded_dictionaries { + dict_sizes.push(write_message(&mut *writer, dict, write_options)?); + } + + // Collect batch buffers into segments without copying into a flat Vec. + let mut fbb = FlatBufferBuilder::new(); + let mut nodes: Vec = vec![]; + let mut buffer_metas: Vec = vec![]; + let mut encoded_buffers: Vec = vec![]; + let mut offset = 0i64; + + let batch_compression_type = write_options.batch_compression_type; + let compression = batch_compression_type.map(|t| { + let mut c = crate::BodyCompressionBuilder::new(&mut fbb); + c.add_method(crate::BodyCompressionMethod::BUFFER); + c.add_codec(t); + c.finish() + }); + let compression_codec: Option = + batch_compression_type.map(TryInto::try_into).transpose()?; + + let mut variadic_buffer_counts = vec![]; + for array in batch.columns() { + let array_data = array.to_data(); + offset = write_array_data( + &array_data, + &mut buffer_metas, + &mut encoded_buffers, + &mut nodes, + offset, + array.len(), + array.null_count(), + compression_codec, + compression_context, + write_options, + )?; + append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); + } + + let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); + let body_len = offset as usize + tail_pad; + + // All buffer offsets and sizes are known at this point. build the FlatBuffer message now. + let buffers_fb = fbb.create_vector(&buffer_metas); + let nodes_fb = fbb.create_vector(&nodes); + let variadic_fb = (!variadic_buffer_counts.is_empty()) + .then(|| fbb.create_vector(&variadic_buffer_counts)); + + let root = { + let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); + batch_builder.add_length(batch.num_rows() as i64); + batch_builder.add_nodes(nodes_fb); + batch_builder.add_buffers(buffers_fb); + if let Some(c) = compression { + batch_builder.add_compression(c); + } + if let Some(v) = variadic_fb { + batch_builder.add_variadicBufferCounts(v); + } + batch_builder.finish().as_union_value() + }; + + let mut message = crate::MessageBuilder::new(&mut fbb); + message.add_version(write_options.metadata_version); + message.add_header_type(crate::MessageHeader::RecordBatch); + message.add_bodyLength(body_len as i64); + message.add_header(root); + let message_offset = message.finish(); + fbb.finish(message_offset, None); + let ipc_message = fbb.finished_data(); + + // Write IPC message header (continuation marker + flatbuf + alignment padding). + // without the flush() calls. flushing is deferred to finish(). + let a = usize::from(write_options.alignment - 1); + let flatbuf_size = ipc_message.len(); + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; + let aligned_size = (flatbuf_size + prefix_size + a) & !a; + let padding_bytes = aligned_size - flatbuf_size - prefix_size; + + match write_options.metadata_version { + crate::MetadataVersion::V1 + | crate::MetadataVersion::V2 + | crate::MetadataVersion::V3 => { + unreachable!("Options with this metadata version cannot be created") + } + crate::MetadataVersion::V4 => { + if !write_options.write_legacy_ipc_format { + writer.write_all(&CONTINUATION_MARKER)?; + } + writer.write_all(&((aligned_size - prefix_size) as i32).to_le_bytes())?; + } + crate::MetadataVersion::V5 => { + writer.write_all(&CONTINUATION_MARKER)?; + writer.write_all(&((aligned_size - prefix_size) as i32).to_le_bytes())?; + } + z => panic!("Unsupported MetadataVersion {z:?}"), + } + writer.write_all(ipc_message)?; + writer.write_all(&PADDING[..padding_bytes])?; + + // Stream body segments directly to the output || no intermediate arrow_data copy. + if body_len > 0 { + for encoded in &encoded_buffers { + let data = encoded.as_slice(); + let pad = pad_to_alignment(write_options.alignment, data.len()); + writer.write_all(data)?; + writer.write_all(&PADDING[..pad])?; + } + writer.write_all(&PADDING[..tail_pad])?; + } + + Ok((dict_sizes, (aligned_size, body_len))) + } } fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { @@ -1165,24 +1363,24 @@ impl FileWriter { )); } - let (encoded_dictionaries, encoded_message) = self.data_gen.encode( + let (dict_sizes, (meta, data)) = self.data_gen.write_batch_direct( batch, &mut self.dictionary_tracker, &self.write_options, &mut self.compression_context, + &mut self.writer, )?; - for encoded_dictionary in encoded_dictionaries { - let (meta, data) = - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; - - let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64); + for (dict_meta, dict_data) in dict_sizes { + let block = crate::Block::new( + self.block_offsets as i64, + dict_meta as i32, + dict_data as i64, + ); self.dictionary_blocks.push(block); - self.block_offsets += meta + data; + self.block_offsets += dict_meta + dict_data; } - let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?; - // add a record block for the footer let block = crate::Block::new( self.block_offsets as i64, @@ -1440,21 +1638,14 @@ impl StreamWriter { )); } - let (encoded_dictionaries, encoded_message) = self - .data_gen - .encode( - batch, - &mut self.dictionary_tracker, - &self.write_options, - &mut self.compression_context, - ) - .expect("StreamWriter is configured to not error on dictionary replacement"); - - for encoded_dictionary in encoded_dictionaries { - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; - } + self.data_gen.write_batch_direct( + batch, + &mut self.dictionary_tracker, + &self.write_options, + &mut self.compression_context, + &mut self.writer, + )?; - write_message(&mut self.writer, encoded_message, &self.write_options)?; Ok(()) } @@ -1786,7 +1977,7 @@ fn get_list_view_array_buffers( /// the array's offset and length. This helps reduce the encoded size of sliced /// arrays /// -fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] { +fn get_buffer(array_data: &ArrayData) -> Buffer { let buffer = &array_data.buffers()[0]; let layout = layout(array_data.data_type()); let spec = &layout.buffers[0]; @@ -1796,9 +1987,9 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] { if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) { let byte_offset = array_data.offset() * byte_width; let buffer_length = min(min_length, buffer.len() - byte_offset); - &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)] + buffer.slice_with_length(byte_offset, buffer_length) } else { - buffer.as_slice() + buffer.clone() } } @@ -1807,7 +1998,7 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] { fn write_array_data( array_data: &ArrayData, buffers: &mut Vec, - arrow_data: &mut Vec, + encoded_buffers: &mut Vec, nodes: &mut Vec, offset: i64, num_rows: usize, @@ -1838,9 +2029,9 @@ fn write_array_data( }; offset = write_buffer( - null_buffer.as_slice(), + null_buffer, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1853,9 +2044,9 @@ fn write_array_data( let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { offset = write_buffer( - buffer.as_slice(), + buffer, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1869,11 +2060,10 @@ fn write_array_data( // Current implementation just serialize the raw arrays as given and not try to optimize anything. // If users wants to "compact" the arrays prior to sending them over IPC, // they should consider the gc API suggested in #5513 - let views = get_or_truncate_buffer(array_data); offset = write_buffer( - views, + get_buffer(array_data), buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1882,9 +2072,9 @@ fn write_array_data( for buffer in array_data.buffers().iter().skip(1) { offset = write_buffer( - buffer.as_slice(), + buffer.clone(), buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1895,9 +2085,9 @@ fn write_array_data( let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { offset = write_buffer( - buffer.as_slice(), + buffer, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1914,11 +2104,10 @@ fn write_array_data( // Truncate values assert_eq!(array_data.buffers().len(), 1); - let buffer = get_or_truncate_buffer(array_data); offset = write_buffer( - buffer, + get_buffer(array_data), buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1929,12 +2118,11 @@ fn write_array_data( // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around. assert_eq!(array_data.buffers().len(), 1); - let buffer = &array_data.buffers()[0]; - let buffer = buffer.bit_slice(array_data.offset(), array_data.len()); + let buffer = array_data.buffers()[0].bit_slice(array_data.offset(), array_data.len()); offset = write_buffer( - &buffer, + buffer, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1955,9 +2143,9 @@ fn write_array_data( _ => unreachable!(), }; offset = write_buffer( - offsets.as_slice(), + offsets, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -1966,7 +2154,7 @@ fn write_array_data( offset = write_array_data( &sliced_child_data, buffers, - arrow_data, + encoded_buffers, nodes, offset, sliced_child_data.len(), @@ -1990,9 +2178,9 @@ fn write_array_data( }; offset = write_buffer( - offsets.as_slice(), + offsets, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -2000,9 +2188,9 @@ fn write_array_data( )?; offset = write_buffer( - sizes.as_slice(), + sizes, buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -2012,7 +2200,7 @@ fn write_array_data( offset = write_array_data( &child_data, buffers, - arrow_data, + encoded_buffers, nodes, offset, child_data.len(), @@ -2033,7 +2221,7 @@ fn write_array_data( offset = write_array_data( &child_data, buffers, - arrow_data, + encoded_buffers, nodes, offset, child_data.len(), @@ -2046,9 +2234,9 @@ fn write_array_data( } else { for buffer in array_data.buffers() { offset = write_buffer( - buffer, + buffer.clone(), buffers, - arrow_data, + encoded_buffers, offset, compression_codec, compression_context, @@ -2062,13 +2250,11 @@ fn write_array_data( DataType::RunEndEncoded(_, _) => { // unslice the run encoded array. let arr = unslice_run_array(array_data.clone())?; - // recursively write out nested structures for data_ref in arr.child_data() { - // write the nested data (e.g list data) offset = write_array_data( data_ref, buffers, - arrow_data, + encoded_buffers, nodes, offset, data_ref.len(), @@ -2080,13 +2266,11 @@ fn write_array_data( } } _ => { - // recursively write out nested structures for data_ref in array_data.child_data() { - // write the nested data (e.g list data) offset = write_array_data( data_ref, buffers, - arrow_data, + encoded_buffers, nodes, offset, data_ref.len(), @@ -2103,9 +2287,7 @@ fn write_array_data( /// Write a buffer into `arrow_data`, a vector of bytes, and adds its /// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data` -/// -/// -/// From +/// /// From /// Each constituent buffer is first compressed with the indicated /// compressor, and then written with the uncompressed length in the first 8 /// bytes as a 64-bit little-endian signed integer followed by the compressed @@ -2114,33 +2296,33 @@ fn write_array_data( /// follows is not compressed, which can be useful for cases where /// compression does not yield appreciable savings. fn write_buffer( - buffer: &[u8], // input - buffers: &mut Vec, // output buffer descriptors - arrow_data: &mut Vec, // output stream - offset: i64, // current output stream offset + buffer: Buffer, // input array buffer to encode + buffers: &mut Vec, // IPC buffer metadata (offset + length) for the FlatBuffer message + encoded_buffers: &mut Vec, // accumulated encoded segments, written to output after the message header + offset: i64, // current output stream offset compression_codec: Option, compression_context: &mut CompressionContext, alignment: u8, ) -> Result { - let len: i64 = match compression_codec { - Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?, + let (encoded, len) = match compression_codec { None => { - arrow_data.extend_from_slice(buffer); - buffer.len() + let len = buffer.len() as i64; + (EncodedBuffer::Raw(buffer), len) } - } - .try_into() - .map_err(|e| { - ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}")) - })?; - - // make new index entry + Some(codec) => { + let mut scratch = Vec::new(); + let written = + codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?; + let len = i64::try_from(written) + .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?; + (EncodedBuffer::Compressed(scratch), len) + } + }; buffers.push(crate::Buffer::new(offset, len)); - // padding and make offset aligned - let pad_len = pad_to_alignment(alignment, len as usize); - arrow_data.extend_from_slice(&PADDING[..pad_len]); - - Ok(offset + len + (pad_len as i64)) + // Defer the actual write. segments are streamed to the output after the FlatBuffer header is built. + encoded_buffers.push(encoded); + let pad_len = pad_to_alignment(alignment, len as usize) as i64; + Ok(offset + len + pad_len) } const PADDING: [u8; 64] = [0; 64]; From fd3a4750fb3dfeb51a8ff5fae08b881ca63dcb34 Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 1 Jun 2026 15:01:17 -0400 Subject: [PATCH 02/15] fixed some linting issues --- arrow-ipc/src/writer.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index ed55c60145e9..72faf25dd48e 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -69,7 +69,12 @@ pub struct IpcWriteOptions { /// How to handle updating dictionaries in IPC messages dictionary_handling: DictionaryHandling, } - +/// Return type for [`IpcDataGenerator::write_batch_direct`]: `(dict_sizes, batch_sizes)` where +/// each element is `(ipc_metadata_bytes, body_bytes)`. +/// +/// [`FileWriter`] uses these sizes to build the [`Block`] index entries required by the IPC +/// footer for random-access reads. +type IPCMetadata = Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError>; /// A single buffer segment ready to be written to the output stream. /// /// Used by [`IpcDataGenerator::write_batch_direct`] to avoid staging all buffer @@ -753,7 +758,7 @@ impl IpcDataGenerator { }) } - /// Write dictionaries and a record batch directly to `writer`, skipping the + /// Write dictionaries and record batch's directly to `writer`, skipping the /// intermediate `arrow_data: Vec` accumulator used by [`Self::record_batch_to_bytes`]. /// /// For the uncompressed path each array buffer is held as an Arc-backed slice and @@ -761,9 +766,6 @@ impl IpcDataGenerator { /// each buffer is compressed into a per-buffer scratch `Vec` and written from /// there, eliminating the extra copy that `write_buffer` -> `arrow_data` -> /// `write_body_buffers` would otherwise incur. - /// - /// Returns `(dict_sizes, batch_sizes)` where each element is - /// `(ipc_metadata_bytes, body_bytes)` fn write_batch_direct( &self, batch: &RecordBatch, @@ -771,7 +773,7 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, writer: &mut W, - ) -> Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError> { + ) -> IPCMetadata { let schema = batch.schema(); let mut encoded_dictionaries = Vec::new(); let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); @@ -2287,7 +2289,7 @@ fn write_array_data( /// Write a buffer into `arrow_data`, a vector of bytes, and adds its /// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data` -/// /// From +/// From /// Each constituent buffer is first compressed with the indicated /// compressor, and then written with the uncompressed length in the first 8 /// bytes as a 64-bit little-endian signed integer followed by the compressed From 213c8e75a762d9d487c6579c0a5813c5f5dc5c3a Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 1 Jun 2026 15:26:37 -0400 Subject: [PATCH 03/15] minor clean up. good to push --- arrow-ipc/src/writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 72faf25dd48e..a9ea44296c76 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -742,7 +742,6 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - //same as [Self::record_batch_to_bytes] let mut arrow_data: Vec = Vec::with_capacity(body_len); for encoded in &encoded_buffers { arrow_data.extend_from_slice(encoded.as_slice()); @@ -762,7 +761,7 @@ impl IpcDataGenerator { /// intermediate `arrow_data: Vec` accumulator used by [`Self::record_batch_to_bytes`]. /// /// For the uncompressed path each array buffer is held as an Arc-backed slice and - /// written straight to `writer` — one copy instead of two. For the compressed path + /// written straight to `writer`. one copy instead of two. For the compressed path /// each buffer is compressed into a per-buffer scratch `Vec` and written from /// there, eliminating the extra copy that `write_buffer` -> `arrow_data` -> /// `write_body_buffers` would otherwise incur. From 6df331c09beb5af6edbf588f5e0c017471c71415 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 2 Jun 2026 13:28:56 -0400 Subject: [PATCH 04/15] fix CI --- arrow-ipc/src/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index a9ea44296c76..7f8b0e76f7e1 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -72,7 +72,7 @@ pub struct IpcWriteOptions { /// Return type for [`IpcDataGenerator::write_batch_direct`]: `(dict_sizes, batch_sizes)` where /// each element is `(ipc_metadata_bytes, body_bytes)`. /// -/// [`FileWriter`] uses these sizes to build the [`Block`] index entries required by the IPC +/// [`FileWriter`] uses these sizes to build the Block index entries required by the IPC /// footer for random-access reads. type IPCMetadata = Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError>; /// A single buffer segment ready to be written to the output stream. From 781c75ccb2677bdeeb4c2de0ceda061d7c571b18 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 3 Jun 2026 14:45:32 -0400 Subject: [PATCH 05/15] refactored PR to be easier to read/ dedupe logic --- arrow-ipc/src/writer.rs | 324 ++++++++++++++++++---------------------- 1 file changed, 146 insertions(+), 178 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 7f8b0e76f7e1..0deefc78aa87 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -69,12 +69,33 @@ pub struct IpcWriteOptions { /// How to handle updating dictionaries in IPC messages dictionary_handling: DictionaryHandling, } -/// Return type for [`IpcDataGenerator::write_batch_direct`]: `(dict_sizes, batch_sizes)` where -/// each element is `(ipc_metadata_bytes, body_bytes)`. +/// Return value for [`IpcDataGenerator::write_batch_direct`]. /// -/// [`FileWriter`] uses these sizes to build the Block index entries required by the IPC -/// footer for random-access reads. -type IPCMetadata = Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError>; +/// `IPCMetadata` contains per-dictionary sizes written before the record +/// batch (`dictionary_block_sizes`) and the padded header length and body +/// length for the record batch itself. [`FileWriter`] uses these sizes to +/// build the Block index entries required by the IPC footer for +/// random-access reads. +struct IPCMetadata { + /// Per-dictionary (meta_bytes, body_bytes) sizes for dictionaries written + /// before the record batch. + dictionary_block_sizes: Vec<(usize, usize)>, + /// Flatbuffer header size including continuation prefix and padding. + padded_header_len: usize, + /// Total length of the record-batch body (after padding). + body_len: usize, +} +/// Return type of [`IpcDataGenerator::encode_record_batch`]. +/// +/// Exactly one variant is produced per call depending on whether a writer was supplied: +/// - `Encoded` — body was flattened into `EncodedData`; used by `record_batch_to_bytes`. +/// - `Written` — body was streamed to the writer; used by `write_batch_direct` to build +/// the `IPCMetadata` block sizes that `FileWriter` needs for its footer index. +enum BatchEncoding { + Encoded(EncodedData), + Written { padded_header_len: usize, body_len: usize }, +} + /// A single buffer segment ready to be written to the output stream. /// /// Used by [`IpcDataGenerator::write_batch_direct`] to avoid staging all buffer @@ -547,14 +568,19 @@ impl IpcDataGenerator { ) } - /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the - /// other for the batch's data - fn record_batch_to_bytes( + /// Shared core used by both [`Self::record_batch_to_bytes`] and [`Self::write_batch_direct`]. + /// + /// Builds the FlatBuffer header and encodes all array buffers. When `writer` is `Some` + /// the header and body are streamed directly to the writer and [`BatchEncoding::Written`] + /// is returned; when `None` the body is flattened into an owned `Vec` and + /// [`BatchEncoding::Encoded`] is returned. + fn encode_record_batch( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, - ) -> Result { + writer: Option<&mut dyn Write>, + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -562,7 +588,6 @@ impl IpcDataGenerator { let mut encoded_buffers: Vec = vec![]; let mut offset = 0i64; - // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -591,18 +616,15 @@ impl IpcDataGenerator { compression_context, write_options, )?; - append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } - // pad the tail of body data let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); let body_len = offset as usize + tail_pad; - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - let variadic_buffer = if variadic_buffer_counts.is_empty() { + let buffers_fb = fbb.create_vector(&buffers); + let nodes_fb = fbb.create_vector(&nodes); + let variadic_fb = if variadic_buffer_counts.is_empty() { None } else { Some(fbb.create_vector(&variadic_buffer_counts)) @@ -611,19 +633,17 @@ impl IpcDataGenerator { let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); + batch_builder.add_nodes(nodes_fb); + batch_builder.add_buffers(buffers_fb); if let Some(c) = compression { batch_builder.add_compression(c); } - - if let Some(v) = variadic_buffer { + if let Some(v) = variadic_fb { batch_builder.add_variadicBufferCounts(v); } - let b = batch_builder.finish(); - b.as_union_value() + batch_builder.finish().as_union_value() }; - // create an crate::Message + let mut message = crate::MessageBuilder::new(&mut fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); @@ -631,23 +651,73 @@ impl IpcDataGenerator { message.add_header(root); let root = message.finish(); fbb.finish(root, None); - let finished_data = fbb.finished_data(); + let ipc_message = fbb.finished_data().to_vec(); - // EncodedData.arrow_data is Vec, so we must flatten here. - // write_batch_direct skips this by streaming EncodedBuffer segments directly to the writer. - let mut arrow_data: Vec = Vec::with_capacity(body_len); - for encoded in &encoded_buffers { - arrow_data.extend_from_slice(encoded.as_slice()); - arrow_data.extend_from_slice( - &PADDING[..pad_to_alignment(write_options.alignment, encoded.len())], - ); + let a = usize::from(write_options.alignment - 1); + let prefix_size = if write_options.write_legacy_ipc_format { 4 } else { 8 }; + let padded_header_len = (ipc_message.len() + prefix_size + a) & !a; + + if let Some(w) = writer { + // Stream header + body directly — no intermediate Vec. + let padding_bytes = padded_header_len - ipc_message.len() - prefix_size; + match write_options.metadata_version { + crate::MetadataVersion::V1 + | crate::MetadataVersion::V2 + | crate::MetadataVersion::V3 => { + unreachable!("Options with this metadata version cannot be created") + } + crate::MetadataVersion::V4 => { + if !write_options.write_legacy_ipc_format { + w.write_all(&CONTINUATION_MARKER)?; + } + w.write_all(&((padded_header_len - prefix_size) as i32).to_le_bytes())?; + } + crate::MetadataVersion::V5 => { + w.write_all(&CONTINUATION_MARKER)?; + w.write_all(&((padded_header_len - prefix_size) as i32).to_le_bytes())?; + } + z => panic!("Unsupported MetadataVersion {z:?}"), + } + w.write_all(&ipc_message)?; + w.write_all(&PADDING[..padding_bytes])?; + if body_len > 0 { + for encoded in &encoded_buffers { + let data = encoded.as_slice(); + w.write_all(data)?; + w.write_all(&PADDING[..pad_to_alignment(write_options.alignment, data.len())])?; + } + w.write_all(&PADDING[..tail_pad])?; + } + Ok(BatchEncoding::Written { padded_header_len, body_len }) + } else { + // Flatten body segments into a contiguous Vec. + let mut arrow_data: Vec = Vec::with_capacity(body_len); + for encoded in &encoded_buffers { + arrow_data.extend_from_slice(encoded.as_slice()); + arrow_data.extend_from_slice( + &PADDING[..pad_to_alignment(write_options.alignment, encoded.len())], + ); + } + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + Ok(BatchEncoding::Encoded(EncodedData { ipc_message, arrow_data })) } - arrow_data.extend_from_slice(&PADDING[..tail_pad]); + } - Ok(EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, - }) + /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the + /// other for the batch's data + fn record_batch_to_bytes( + &self, + batch: &RecordBatch, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + ) -> Result { + if let BatchEncoding::Encoded(data) = + self.encode_record_batch(batch, write_options, compression_context, None)? + { + Ok(data) + } else { + unreachable!() + } } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the @@ -772,7 +842,7 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, writer: &mut W, - ) -> IPCMetadata { + ) -> Result { let schema = batch.schema(); let mut encoded_dictionaries = Vec::new(); let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); @@ -792,118 +862,17 @@ impl IpcDataGenerator { dict_sizes.push(write_message(&mut *writer, dict, write_options)?); } - // Collect batch buffers into segments without copying into a flat Vec. - let mut fbb = FlatBufferBuilder::new(); - let mut nodes: Vec = vec![]; - let mut buffer_metas: Vec = vec![]; - let mut encoded_buffers: Vec = vec![]; - let mut offset = 0i64; - - let batch_compression_type = write_options.batch_compression_type; - let compression = batch_compression_type.map(|t| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); - c.add_method(crate::BodyCompressionMethod::BUFFER); - c.add_codec(t); - c.finish() - }); - let compression_codec: Option = - batch_compression_type.map(TryInto::try_into).transpose()?; - - let mut variadic_buffer_counts = vec![]; - for array in batch.columns() { - let array_data = array.to_data(); - offset = write_array_data( - &array_data, - &mut buffer_metas, - &mut encoded_buffers, - &mut nodes, - offset, - array.len(), - array.null_count(), - compression_codec, - compression_context, - write_options, - )?; - append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); - } - - let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); - let body_len = offset as usize + tail_pad; - - // All buffer offsets and sizes are known at this point. build the FlatBuffer message now. - let buffers_fb = fbb.create_vector(&buffer_metas); - let nodes_fb = fbb.create_vector(&nodes); - let variadic_fb = (!variadic_buffer_counts.is_empty()) - .then(|| fbb.create_vector(&variadic_buffer_counts)); - - let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes_fb); - batch_builder.add_buffers(buffers_fb); - if let Some(c) = compression { - batch_builder.add_compression(c); - } - if let Some(v) = variadic_fb { - batch_builder.add_variadicBufferCounts(v); - } - batch_builder.finish().as_union_value() - }; - - let mut message = crate::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(crate::MessageHeader::RecordBatch); - message.add_bodyLength(body_len as i64); - message.add_header(root); - let message_offset = message.finish(); - fbb.finish(message_offset, None); - let ipc_message = fbb.finished_data(); - - // Write IPC message header (continuation marker + flatbuf + alignment padding). - // without the flush() calls. flushing is deferred to finish(). - let a = usize::from(write_options.alignment - 1); - let flatbuf_size = ipc_message.len(); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 + if let BatchEncoding::Written { padded_header_len, body_len } = + self.encode_record_batch(batch, write_options, compression_context, Some(writer))? + { + Ok(IPCMetadata { + dictionary_block_sizes: dict_sizes, + padded_header_len, + body_len, + }) } else { - 8 - }; - let aligned_size = (flatbuf_size + prefix_size + a) & !a; - let padding_bytes = aligned_size - flatbuf_size - prefix_size; - - match write_options.metadata_version { - crate::MetadataVersion::V1 - | crate::MetadataVersion::V2 - | crate::MetadataVersion::V3 => { - unreachable!("Options with this metadata version cannot be created") - } - crate::MetadataVersion::V4 => { - if !write_options.write_legacy_ipc_format { - writer.write_all(&CONTINUATION_MARKER)?; - } - writer.write_all(&((aligned_size - prefix_size) as i32).to_le_bytes())?; - } - crate::MetadataVersion::V5 => { - writer.write_all(&CONTINUATION_MARKER)?; - writer.write_all(&((aligned_size - prefix_size) as i32).to_le_bytes())?; - } - z => panic!("Unsupported MetadataVersion {z:?}"), - } - writer.write_all(ipc_message)?; - writer.write_all(&PADDING[..padding_bytes])?; - - // Stream body segments directly to the output || no intermediate arrow_data copy. - if body_len > 0 { - for encoded in &encoded_buffers { - let data = encoded.as_slice(); - let pad = pad_to_alignment(write_options.alignment, data.len()); - writer.write_all(data)?; - writer.write_all(&PADDING[..pad])?; - } - writer.write_all(&PADDING[..tail_pad])?; + unreachable!() } - - Ok((dict_sizes, (aligned_size, body_len))) } } @@ -1364,7 +1333,7 @@ impl FileWriter { )); } - let (dict_sizes, (meta, data)) = self.data_gen.write_batch_direct( + let ipc_meta = self.data_gen.write_batch_direct( batch, &mut self.dictionary_tracker, &self.write_options, @@ -1372,24 +1341,24 @@ impl FileWriter { &mut self.writer, )?; - for (dict_meta, dict_data) in dict_sizes { + for (dict_meta_bytes, dict_body_bytes) in ipc_meta.dictionary_block_sizes { let block = crate::Block::new( self.block_offsets as i64, - dict_meta as i32, - dict_data as i64, + dict_meta_bytes as i32, + dict_body_bytes as i64, ); self.dictionary_blocks.push(block); - self.block_offsets += dict_meta + dict_data; + self.block_offsets += dict_meta_bytes + dict_body_bytes; } // add a record block for the footer let block = crate::Block::new( self.block_offsets as i64, - meta as i32, // TODO: is this still applicable? - data as i64, + ipc_meta.padded_header_len as i32, // header size (padded) + ipc_meta.body_len as i64, ); self.record_blocks.push(block); - self.block_offsets += meta + data; + self.block_offsets += ipc_meta.padded_header_len + ipc_meta.body_len; Ok(()) } @@ -2029,7 +1998,7 @@ fn write_array_data( Some(buffer) => buffer.inner().sliced(), }; - offset = write_buffer( + offset = collect_encoded_buffers( null_buffer, buffers, encoded_buffers, @@ -2044,7 +2013,7 @@ fn write_array_data( if matches!(data_type, DataType::Binary | DataType::Utf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { - offset = write_buffer( + offset = collect_encoded_buffers( buffer, buffers, encoded_buffers, @@ -2061,7 +2030,7 @@ fn write_array_data( // Current implementation just serialize the raw arrays as given and not try to optimize anything. // If users wants to "compact" the arrays prior to sending them over IPC, // they should consider the gc API suggested in #5513 - offset = write_buffer( + offset = collect_encoded_buffers( get_buffer(array_data), buffers, encoded_buffers, @@ -2072,7 +2041,7 @@ fn write_array_data( )?; for buffer in array_data.buffers().iter().skip(1) { - offset = write_buffer( + offset = collect_encoded_buffers( buffer.clone(), buffers, encoded_buffers, @@ -2085,7 +2054,7 @@ fn write_array_data( } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { - offset = write_buffer( + offset = collect_encoded_buffers( buffer, buffers, encoded_buffers, @@ -2105,7 +2074,7 @@ fn write_array_data( // Truncate values assert_eq!(array_data.buffers().len(), 1); - offset = write_buffer( + offset = collect_encoded_buffers( get_buffer(array_data), buffers, encoded_buffers, @@ -2120,7 +2089,7 @@ fn write_array_data( assert_eq!(array_data.buffers().len(), 1); let buffer = array_data.buffers()[0].bit_slice(array_data.offset(), array_data.len()); - offset = write_buffer( + offset = collect_encoded_buffers( buffer, buffers, encoded_buffers, @@ -2143,7 +2112,7 @@ fn write_array_data( DataType::LargeList(_) => get_list_array_buffers::(array_data), _ => unreachable!(), }; - offset = write_buffer( + offset = collect_encoded_buffers( offsets, buffers, encoded_buffers, @@ -2178,7 +2147,7 @@ fn write_array_data( _ => unreachable!(), }; - offset = write_buffer( + offset = collect_encoded_buffers( offsets, buffers, encoded_buffers, @@ -2188,7 +2157,7 @@ fn write_array_data( write_options.alignment, )?; - offset = write_buffer( + offset = collect_encoded_buffers( sizes, buffers, encoded_buffers, @@ -2234,7 +2203,7 @@ fn write_array_data( return Ok(offset); } else { for buffer in array_data.buffers() { - offset = write_buffer( + offset = collect_encoded_buffers( buffer.clone(), buffers, encoded_buffers, @@ -2286,17 +2255,16 @@ fn write_array_data( Ok(offset) } -/// Write a buffer into `arrow_data`, a vector of bytes, and adds its -/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data` -/// From -/// Each constituent buffer is first compressed with the indicated -/// compressor, and then written with the uncompressed length in the first 8 -/// bytes as a 64-bit little-endian signed integer followed by the compressed -/// buffer bytes (and then padding as required by the protocol). The -/// uncompressed length may be set to -1 to indicate that the data that -/// follows is not compressed, which can be useful for cases where -/// compression does not yield appreciable savings. -fn write_buffer( +/// Append a single array buffer to the outgoing IPC body and record its +/// corresponding `crate::Buffer` metadata into `buffers`. Returns the +/// updated byte `offset` after the buffer (including any padding). +/// +/// See +/// for the on-wire layout: each buffer is optionally compressed and then +/// written as the 8-byte little-endian uncompressed length (or `-1` to +/// indicate uncompressed data) followed by the (compressed) bytes and +/// protocol-required padding. +fn collect_encoded_buffers( buffer: Buffer, // input array buffer to encode buffers: &mut Vec, // IPC buffer metadata (offset + length) for the FlatBuffer message encoded_buffers: &mut Vec, // accumulated encoded segments, written to output after the message header From 7acd810e743a95b3b30d7e11b9cb2ba3f6ac1a39 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 3 Jun 2026 15:15:30 -0400 Subject: [PATCH 06/15] linting fix --- arrow-ipc/src/writer.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 0deefc78aa87..ae6e075dfb21 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -93,7 +93,10 @@ struct IPCMetadata { /// the `IPCMetadata` block sizes that `FileWriter` needs for its footer index. enum BatchEncoding { Encoded(EncodedData), - Written { padded_header_len: usize, body_len: usize }, + Written { + padded_header_len: usize, + body_len: usize, + }, } /// A single buffer segment ready to be written to the output stream. @@ -654,7 +657,11 @@ impl IpcDataGenerator { let ipc_message = fbb.finished_data().to_vec(); let a = usize::from(write_options.alignment - 1); - let prefix_size = if write_options.write_legacy_ipc_format { 4 } else { 8 }; + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; let padded_header_len = (ipc_message.len() + prefix_size + a) & !a; if let Some(w) = writer { @@ -688,7 +695,10 @@ impl IpcDataGenerator { } w.write_all(&PADDING[..tail_pad])?; } - Ok(BatchEncoding::Written { padded_header_len, body_len }) + Ok(BatchEncoding::Written { + padded_header_len, + body_len, + }) } else { // Flatten body segments into a contiguous Vec. let mut arrow_data: Vec = Vec::with_capacity(body_len); @@ -699,7 +709,10 @@ impl IpcDataGenerator { ); } arrow_data.extend_from_slice(&PADDING[..tail_pad]); - Ok(BatchEncoding::Encoded(EncodedData { ipc_message, arrow_data })) + Ok(BatchEncoding::Encoded(EncodedData { + ipc_message, + arrow_data, + })) } } @@ -862,8 +875,10 @@ impl IpcDataGenerator { dict_sizes.push(write_message(&mut *writer, dict, write_options)?); } - if let BatchEncoding::Written { padded_header_len, body_len } = - self.encode_record_batch(batch, write_options, compression_context, Some(writer))? + if let BatchEncoding::Written { + padded_header_len, + body_len, + } = self.encode_record_batch(batch, write_options, compression_context, Some(writer))? { Ok(IPCMetadata { dictionary_block_sizes: dict_sizes, From 51693caee75bd4e18ac953e1ed2bc6167475bc83 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 5 Jun 2026 11:21:49 -0400 Subject: [PATCH 07/15] add better benchmarks to diagnose regression --- arrow-flight/benches/flight.rs | 34 +++++++++++++++++++++++++++++++++- arrow-flight/src/encode.rs | 1 - 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs index 4841e9dd9822..ba0ac593de4e 100644 --- a/arrow-flight/benches/flight.rs +++ b/arrow-flight/benches/flight.rs @@ -83,5 +83,37 @@ fn bench_roundtrip(c: &mut Criterion) { } } -criterion_group!(benches, bench_encode, bench_roundtrip); +async fn encode_and_send(channel: Channel, batch: RecordBatch) { + let mut client = FlightClient::new(channel); + let frames = FlightDataEncoderBuilder::new().build(futures::stream::iter([Ok(batch)])); + client + .do_put(frames) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); +} + +fn bench_encode_to_send(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let (channel, _) = rt.block_on(start_server()); + let mut g = c.benchmark_group("encode_to_send"); + + for &(name, build) in TYPES { + for &rows in &ROWS { + for &cols in &COLS { + let batch = build_batch(name, rows, cols, build); + let id = BenchmarkId::new(name, format!("{rows}x{cols}")); + g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64)); + g.bench_with_input(id, &batch, |b, batch| { + b.to_async(&rt) + .iter(|| encode_and_send(channel.clone(), batch.clone())); + }); + } + } + } +} + +criterion_group!(benches, bench_encode, bench_roundtrip, bench_encode_to_send); criterion_main!(benches); diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 191da024136f..2ef70dca5da2 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -731,7 +731,6 @@ impl FlightIpcEncoder { &self.options, &mut self.compression_context, )?; - let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); let flight_batch = encoded_batch.into(); From 742b0c077cce5b56db953c78965fd0ad44a3723d Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 5 Jun 2026 17:28:00 -0400 Subject: [PATCH 08/15] sharing current work, planning to split this into two PR --- arrow-ipc/src/writer.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index ae6e075dfb21..d13e7a5dd5a2 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -588,7 +588,8 @@ impl IpcDataGenerator { let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut encoded_buffers: Vec = vec![]; + let mut encoded_buffers: Vec = + Vec::with_capacity(encoded_buffer_count(batch)); let mut offset = 0i64; let batch_compression_type = write_options.batch_compression_type; @@ -1978,6 +1979,16 @@ fn get_buffer(array_data: &ArrayData) -> Buffer { } } +/// Returns the total number of [`EncodedBuffer`] entries that will be pushed during encoding of +/// `batch`. Used to pre-allocate the `encoded_buffers` Vec and avoid repeated reallocations as +/// columns are visited. +fn encoded_buffer_count(batch: &RecordBatch) -> usize { + fn count(data: &ArrayData) -> usize { + data.buffers().len() + data.child_data().iter().map(count).sum::() + } + batch.columns().iter().map(|c| count(&c.to_data())).sum() +} + /// Write array data to a vector of bytes #[allow(clippy::too_many_arguments)] fn write_array_data( From 11e9ac908fbcd3f0c8e696ba92a90db8e1f35992 Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 7 Jun 2026 16:09:40 -0400 Subject: [PATCH 09/15] split PR into Stream/Filter Writer focused --- arrow-ipc/src/writer.rs | 878 ++++++++++++++++++++++++++-------------- 1 file changed, 566 insertions(+), 312 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index d13e7a5dd5a2..eb02c0852c2d 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -69,51 +69,22 @@ pub struct IpcWriteOptions { /// How to handle updating dictionaries in IPC messages dictionary_handling: DictionaryHandling, } -/// Return value for [`IpcDataGenerator::write_batch_direct`]. -/// -/// `IPCMetadata` contains per-dictionary sizes written before the record -/// batch (`dictionary_block_sizes`) and the padded header length and body -/// length for the record batch itself. [`FileWriter`] uses these sizes to -/// build the Block index entries required by the IPC footer for -/// random-access reads. -struct IPCMetadata { - /// Per-dictionary (meta_bytes, body_bytes) sizes for dictionaries written - /// before the record batch. - dictionary_block_sizes: Vec<(usize, usize)>, - /// Flatbuffer header size including continuation prefix and padding. - padded_header_len: usize, - /// Total length of the record-batch body (after padding). - body_len: usize, -} -/// Return type of [`IpcDataGenerator::encode_record_batch`]. -/// -/// Exactly one variant is produced per call depending on whether a writer was supplied: -/// - `Encoded` — body was flattened into `EncodedData`; used by `record_batch_to_bytes`. -/// - `Written` — body was streamed to the writer; used by `write_batch_direct` to build -/// the `IPCMetadata` block sizes that `FileWriter` needs for its footer index. -enum BatchEncoding { - Encoded(EncodedData), - Written { - padded_header_len: usize, - body_len: usize, - }, -} /// A single buffer segment ready to be written to the output stream. /// -/// Used by [`IpcDataGenerator::write_batch_direct`] to avoid staging all buffer -/// data through a flat `arrow_data: Vec` accumulator. +/// For the uncompressed path the original Arc-backed [`Buffer`] is stored +/// directly (zero copy). For the compressed path the compressed bytes are +/// owned by a scratch `Vec`. enum EncodedBuffer { - /// Uncompressed : an Arc-backed reference to the original array buffer. + /// Uncompressed: Arc-backed reference to the original array buffer. Raw(Buffer), - /// Compressed : owned scratch bytes produced by the codec. + /// Compressed: owned scratch bytes produced by the codec. Compressed(Vec), } impl EncodedBuffer { fn as_slice(&self) -> &[u8] { match self { - // Cloning is a reference-count bump EncodedBuffer::Raw(b) => b.as_slice(), EncodedBuffer::Compressed(v) => v.as_slice(), } @@ -126,6 +97,73 @@ impl EncodedBuffer { } } } + +/// Per-message sizes produced by [`IpcDataGenerator::write_direct`]. +/// +/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for +/// random-access reads. Fields mirror those of [`crate::Block`]: a padded header length and a +/// body length for the record batch, plus one pair per dictionary written before it. +struct IpcWriteMetadata { + /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written + /// before the record batch. + dictionary_block_sizes: Vec<(usize, usize)>, + /// Flatbuffer header size including continuation prefix and alignment padding. + padded_header_len: usize, + /// Total length of the record-batch body including trailing alignment padding. + body_len: usize, +} + +/// Passes all writes through to `inner` while capturing the first 8 bytes of the stream +/// (the IPC continuation header) and counting total bytes written. Used by +/// [`IpcDataGenerator::write_direct`] to recover message sizes without an intermediate buffer. +struct SizeCapture<'a, W: Write> { + inner: &'a mut W, + total: usize, + header: [u8; 8], + header_len: usize, +} + +impl<'a, W: Write> SizeCapture<'a, W> { + fn new(writer: &'a mut W) -> Self { + Self { + inner: writer, + total: 0, + header: [0u8; 8], + header_len: 0, + } + } + + fn sizes(&self, legacy: bool) -> (usize, usize) { + // write_continuation writes: + // legacy: 4 bytes = total_len (= aligned_size - 4) + // non-legacy: 4 bytes marker + 4 bytes total_len (= aligned_size - 8) + let (prefix_size, msg_bytes): (usize, [u8; 4]) = if legacy { + (4, self.header[0..4].try_into().unwrap()) + } else { + (8, self.header[4..8].try_into().unwrap()) + }; + let meta = i32::from_le_bytes(msg_bytes) as usize + prefix_size; + let data = self.total - meta; + (meta, data) + } +} + +impl Write for SizeCapture<'_, W> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let n = self.inner.write(buf)?; + if self.header_len < 8 { + let copy = (8 - self.header_len).min(n); + self.header[self.header_len..self.header_len + copy].copy_from_slice(&buf[..copy]); + self.header_len += copy; + } + self.total += n; + Ok(n) + } + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + impl IpcWriteOptions { /// Configures compression when writing IPC files. /// @@ -530,16 +568,32 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { + let encoded_dictionaries = self.encode_all_dicts( + batch, + dictionary_tracker, + write_options, + compression_context, + )?; + let encoded_message = + self.record_batch_to_bytes(batch, write_options, compression_context, None)?; + Ok((encoded_dictionaries, encoded_message)) + } + + /// Encode dictionary batches for all columns in `batch`. + fn encode_all_dicts( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + ) -> Result, ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); - let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); - for (i, field) in schema.fields().iter().enumerate() { - let column = batch.column(i); self.encode_dictionaries( field, - column, + batch.column(i), &mut encoded_dictionaries, dictionary_tracker, write_options, @@ -547,10 +601,46 @@ impl IpcDataGenerator { compression_context, )?; } + Ok(encoded_dictionaries) + } - let encoded_message = - self.record_batch_to_bytes(batch, write_options, compression_context)?; - Ok((encoded_dictionaries, encoded_message)) + /// Write dictionary batches and the record batch directly to `writer`, skipping the + /// intermediate body `Vec` allocations + /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks. + fn write_direct( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + writer: &mut W, + ) -> Result { + let encoded_dictionaries = self.encode_all_dicts( + batch, + dictionary_tracker, + write_options, + compression_context, + )?; + + let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len()); + for dict in encoded_dictionaries { + dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?); + } + + let mut capture = SizeCapture::new(writer); + self.record_batch_to_bytes( + batch, + write_options, + compression_context, + Some(&mut capture), + )?; + let (padded_header_len, body_len) = capture.sizes(write_options.write_legacy_ipc_format); + + Ok(IpcWriteMetadata { + dictionary_block_sizes, + padded_header_len, + body_len, + }) } /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch). @@ -571,27 +661,25 @@ impl IpcDataGenerator { ) } - /// Shared core used by both [`Self::record_batch_to_bytes`] and [`Self::write_batch_direct`]. + /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the + /// other for the batch's data. /// - /// Builds the FlatBuffer header and encodes all array buffers. When `writer` is `Some` - /// the header and body are streamed directly to the writer and [`BatchEncoding::Written`] - /// is returned; when `None` the body is flattened into an owned `Vec` and - /// [`BatchEncoding::Encoded`] is returned. - fn encode_record_batch( + /// When `writer` is `Some`, the message is written directly to it and an empty + /// [`EncodedData`] is returned (the data has already been flushed). When `writer` + /// is `None` the encoded bytes are returned for the caller to write. + fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, writer: Option<&mut dyn Write>, - ) -> Result { + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut encoded_buffers: Vec = - Vec::with_capacity(encoded_buffer_count(batch)); - let mut offset = 0i64; + // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -604,13 +692,24 @@ impl IpcDataGenerator { let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; + let alignment = write_options.alignment; let mut variadic_buffer_counts = vec![]; + let mut offset = 0i64; + let mut arrow_data: Vec = vec![]; + let capacity = batch + .columns() + .iter() + .map(|a| estimate_encoded_buffer_count(a.data_type())) + .sum(); + let mut encoded_buffers: Vec = Vec::with_capacity(capacity); + let is_direct = writer.is_some(); for array in batch.columns() { let array_data = array.to_data(); offset = write_array_data( &array_data, &mut buffers, + &mut arrow_data, &mut encoded_buffers, &mut nodes, offset, @@ -619,16 +718,19 @@ impl IpcDataGenerator { compression_codec, compression_context, write_options, + is_direct, )?; append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } - let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); + // Build FlatBuffer header — same for both paths. + let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; + // write data let buffers_fb = fbb.create_vector(&buffers); let nodes_fb = fbb.create_vector(&nodes); - let variadic_fb = if variadic_buffer_counts.is_empty() { + let variadic_buffer = if variadic_buffer_counts.is_empty() { None } else { Some(fbb.create_vector(&variadic_buffer_counts)) @@ -642,12 +744,14 @@ impl IpcDataGenerator { if let Some(c) = compression { batch_builder.add_compression(c); } - if let Some(v) = variadic_fb { + + if let Some(v) = variadic_buffer { batch_builder.add_variadicBufferCounts(v); } - batch_builder.finish().as_union_value() + let b = batch_builder.finish(); + b.as_union_value() }; - + // create an crate::Message let mut message = crate::MessageBuilder::new(&mut fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); @@ -657,81 +761,35 @@ impl IpcDataGenerator { fbb.finish(root, None); let ipc_message = fbb.finished_data().to_vec(); - let a = usize::from(write_options.alignment - 1); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 - } else { - 8 - }; - let padded_header_len = (ipc_message.len() + prefix_size + a) & !a; - if let Some(w) = writer { - // Stream header + body directly — no intermediate Vec. - let padding_bytes = padded_header_len - ipc_message.len() - prefix_size; - match write_options.metadata_version { - crate::MetadataVersion::V1 - | crate::MetadataVersion::V2 - | crate::MetadataVersion::V3 => { - unreachable!("Options with this metadata version cannot be created") - } - crate::MetadataVersion::V4 => { - if !write_options.write_legacy_ipc_format { - w.write_all(&CONTINUATION_MARKER)?; - } - w.write_all(&((padded_header_len - prefix_size) as i32).to_le_bytes())?; - } - crate::MetadataVersion::V5 => { - w.write_all(&CONTINUATION_MARKER)?; - w.write_all(&((padded_header_len - prefix_size) as i32).to_le_bytes())?; - } - z => panic!("Unsupported MetadataVersion {z:?}"), - } + // Stream header then each buffer directly — no intermediate Vec. + let a = usize::from(alignment - 1); + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; + let aligned_size = (ipc_message.len() + prefix_size + a) & !a; + let padding_bytes = aligned_size - ipc_message.len() - prefix_size; + write_continuation(&mut *w, write_options, (aligned_size - prefix_size) as i32)?; w.write_all(&ipc_message)?; w.write_all(&PADDING[..padding_bytes])?; - if body_len > 0 { - for encoded in &encoded_buffers { - let data = encoded.as_slice(); - w.write_all(data)?; - w.write_all(&PADDING[..pad_to_alignment(write_options.alignment, data.len())])?; - } - w.write_all(&PADDING[..tail_pad])?; - } - Ok(BatchEncoding::Written { - padded_header_len, - body_len, - }) - } else { - // Flatten body segments into a contiguous Vec. - let mut arrow_data: Vec = Vec::with_capacity(body_len); - for encoded in &encoded_buffers { - arrow_data.extend_from_slice(encoded.as_slice()); - arrow_data.extend_from_slice( - &PADDING[..pad_to_alignment(write_options.alignment, encoded.len())], - ); + for enc in &encoded_buffers { + w.write_all(enc.as_slice())?; + w.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; } - arrow_data.extend_from_slice(&PADDING[..tail_pad]); - Ok(BatchEncoding::Encoded(EncodedData { - ipc_message, - arrow_data, - })) + w.write_all(&PADDING[..tail_pad])?; + return Ok(EncodedData { + ipc_message: vec![], + arrow_data: vec![], + }); } - } - /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the - /// other for the batch's data - fn record_batch_to_bytes( - &self, - batch: &RecordBatch, - write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, - ) -> Result { - if let BatchEncoding::Encoded(data) = - self.encode_record_batch(batch, write_options, compression_context, None)? - { - Ok(data) - } else { - unreachable!() - } + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + Ok(EncodedData { + ipc_message, + arrow_data, + }) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the @@ -748,8 +806,9 @@ impl IpcDataGenerator { let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut encoded_buffers: Vec = vec![]; + let mut arrow_data: Vec = vec![]; + // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -763,9 +822,13 @@ impl IpcDataGenerator { .map(|batch_compression_type| batch_compression_type.try_into()) .transpose()?; + let alignment = write_options.alignment; + let mut encoded_buffers: Vec = + Vec::with_capacity(estimate_encoded_buffer_count(array_data.data_type())); let offset = write_array_data( array_data, &mut buffers, + &mut arrow_data, &mut encoded_buffers, &mut nodes, 0, @@ -774,14 +837,16 @@ impl IpcDataGenerator { compression_codec, compression_context, write_options, + false, )?; let mut variadic_buffer_counts = vec![]; append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); // pad the tail of body data - let tail_pad = pad_to_alignment(write_options.alignment, offset as usize); + let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; + arrow_data.extend_from_slice(&PADDING[..tail_pad]); // write data let buffers = fbb.create_vector(&buffers); @@ -826,70 +891,11 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - let mut arrow_data: Vec = Vec::with_capacity(body_len); - for encoded in &encoded_buffers { - arrow_data.extend_from_slice(encoded.as_slice()); - arrow_data.extend_from_slice( - &PADDING[..pad_to_alignment(write_options.alignment, encoded.len())], - ); - } - arrow_data.extend_from_slice(&PADDING[..tail_pad]); - Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, }) } - - /// Write dictionaries and record batch's directly to `writer`, skipping the - /// intermediate `arrow_data: Vec` accumulator used by [`Self::record_batch_to_bytes`]. - /// - /// For the uncompressed path each array buffer is held as an Arc-backed slice and - /// written straight to `writer`. one copy instead of two. For the compressed path - /// each buffer is compressed into a per-buffer scratch `Vec` and written from - /// there, eliminating the extra copy that `write_buffer` -> `arrow_data` -> - /// `write_body_buffers` would otherwise incur. - fn write_batch_direct( - &self, - batch: &RecordBatch, - dictionary_tracker: &mut DictionaryTracker, - write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, - writer: &mut W, - ) -> Result { - let schema = batch.schema(); - let mut encoded_dictionaries = Vec::new(); - let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); - for (i, field) in schema.fields().iter().enumerate() { - self.encode_dictionaries( - field, - batch.column(i), - &mut encoded_dictionaries, - dictionary_tracker, - write_options, - &mut dict_id, - compression_context, - )?; - } - let mut dict_sizes = Vec::with_capacity(encoded_dictionaries.len()); - for dict in encoded_dictionaries { - dict_sizes.push(write_message(&mut *writer, dict, write_options)?); - } - - if let BatchEncoding::Written { - padded_header_len, - body_len, - } = self.encode_record_batch(batch, write_options, compression_context, Some(writer))? - { - Ok(IPCMetadata { - dictionary_block_sizes: dict_sizes, - padded_header_len, - body_len, - }) - } else { - unreachable!() - } - } } fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { @@ -1349,7 +1355,7 @@ impl FileWriter { )); } - let ipc_meta = self.data_gen.write_batch_direct( + let meta = self.data_gen.write_direct( batch, &mut self.dictionary_tracker, &self.write_options, @@ -1357,24 +1363,24 @@ impl FileWriter { &mut self.writer, )?; - for (dict_meta_bytes, dict_body_bytes) in ipc_meta.dictionary_block_sizes { + for (header_len, body_len) in meta.dictionary_block_sizes { let block = crate::Block::new( self.block_offsets as i64, - dict_meta_bytes as i32, - dict_body_bytes as i64, + header_len as i32, + body_len as i64, ); self.dictionary_blocks.push(block); - self.block_offsets += dict_meta_bytes + dict_body_bytes; + self.block_offsets += header_len + body_len; } // add a record block for the footer let block = crate::Block::new( self.block_offsets as i64, - ipc_meta.padded_header_len as i32, // header size (padded) - ipc_meta.body_len as i64, + meta.padded_header_len as i32, + meta.body_len as i64, ); self.record_blocks.push(block); - self.block_offsets += ipc_meta.padded_header_len + ipc_meta.body_len; + self.block_offsets += meta.padded_header_len + meta.body_len; Ok(()) } @@ -1624,14 +1630,13 @@ impl StreamWriter { )); } - self.data_gen.write_batch_direct( + self.data_gen.write_direct( batch, &mut self.dictionary_tracker, &self.write_options, &mut self.compression_context, &mut self.writer, )?; - Ok(()) } @@ -1963,7 +1968,7 @@ fn get_list_view_array_buffers( /// the array's offset and length. This helps reduce the encoded size of sliced /// arrays /// -fn get_buffer(array_data: &ArrayData) -> Buffer { +fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer { let buffer = &array_data.buffers()[0]; let layout = layout(array_data.data_type()); let spec = &layout.buffers[0]; @@ -1979,21 +1984,12 @@ fn get_buffer(array_data: &ArrayData) -> Buffer { } } -/// Returns the total number of [`EncodedBuffer`] entries that will be pushed during encoding of -/// `batch`. Used to pre-allocate the `encoded_buffers` Vec and avoid repeated reallocations as -/// columns are visited. -fn encoded_buffer_count(batch: &RecordBatch) -> usize { - fn count(data: &ArrayData) -> usize { - data.buffers().len() + data.child_data().iter().map(count).sum::() - } - batch.columns().iter().map(|c| count(&c.to_data())).sum() -} - /// Write array data to a vector of bytes #[allow(clippy::too_many_arguments)] fn write_array_data( array_data: &ArrayData, buffers: &mut Vec, + arrow_data: &mut Vec, encoded_buffers: &mut Vec, nodes: &mut Vec, offset: i64, @@ -2002,6 +1998,7 @@ fn write_array_data( compression_codec: Option, compression_context: &mut CompressionContext, write_options: &IpcWriteOptions, + is_direct: bool, ) -> Result { let mut offset = offset; if !matches!(array_data.data_type(), DataType::Null) { @@ -2013,7 +2010,7 @@ fn write_array_data( } if has_validity_bitmap(array_data.data_type(), write_options) { // write null buffer if exists - let null_buffer = match array_data.nulls() { + let null_buffer: Buffer = match array_data.nulls() { None => { // create a buffer and fill it with valid bits let num_bytes = bit_util::ceil(num_rows, 8); @@ -2024,23 +2021,9 @@ fn write_array_data( Some(buffer) => buffer.inner().sliced(), }; - offset = collect_encoded_buffers( - null_buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } - - let data_type = array_data.data_type(); - if matches!(data_type, DataType::Binary | DataType::Utf8) { - let (offsets, values) = get_byte_array_buffers::(array_data); - for buffer in [offsets, values] { + if is_direct { offset = collect_encoded_buffers( - buffer, + null_buffer, buffers, encoded_buffers, offset, @@ -2048,6 +2031,44 @@ fn write_array_data( compression_context, write_options.alignment, )?; + } else { + offset = write_buffer( + null_buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } + } + + let data_type = array_data.data_type(); + if matches!(data_type, DataType::Binary | DataType::Utf8) { + let (offsets, values) = get_byte_array_buffers::(array_data); + for buffer in [offsets, values] { + if is_direct { + offset = collect_encoded_buffers( + buffer, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } } } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) { // Slicing the views buffer is safe and easy, @@ -2056,19 +2077,10 @@ fn write_array_data( // Current implementation just serialize the raw arrays as given and not try to optimize anything. // If users wants to "compact" the arrays prior to sending them over IPC, // they should consider the gc API suggested in #5513 - offset = collect_encoded_buffers( - get_buffer(array_data), - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - - for buffer in array_data.buffers().iter().skip(1) { + let views = get_or_truncate_buffer(array_data); + if is_direct { offset = collect_encoded_buffers( - buffer.clone(), + views, buffers, encoded_buffers, offset, @@ -2076,20 +2088,66 @@ fn write_array_data( compression_context, write_options.alignment, )?; - } - } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { - let (offsets, values) = get_byte_array_buffers::(array_data); - for buffer in [offsets, values] { - offset = collect_encoded_buffers( - buffer, + } else { + offset = write_buffer( + views.as_slice(), buffers, - encoded_buffers, + arrow_data, offset, compression_codec, compression_context, write_options.alignment, )?; } + + for buffer in array_data.buffers().iter().skip(1) { + if is_direct { + offset = collect_encoded_buffers( + buffer.clone(), + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } + } + } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { + let (offsets, values) = get_byte_array_buffers::(array_data); + for buffer in [offsets, values] { + if is_direct { + offset = collect_encoded_buffers( + buffer, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } + } } else if DataType::is_numeric(data_type) || DataType::is_temporal(data_type) || matches!( @@ -2100,30 +2158,56 @@ fn write_array_data( // Truncate values assert_eq!(array_data.buffers().len(), 1); - offset = collect_encoded_buffers( - get_buffer(array_data), - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; + let buffer = get_or_truncate_buffer(array_data); + if is_direct { + offset = collect_encoded_buffers( + buffer, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } } else if matches!(data_type, DataType::Boolean) { // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes). // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around. assert_eq!(array_data.buffers().len(), 1); - let buffer = array_data.buffers()[0].bit_slice(array_data.offset(), array_data.len()); - offset = collect_encoded_buffers( - buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; + let buffer = &array_data.buffers()[0]; + let buffer = buffer.bit_slice(array_data.offset(), array_data.len()); + if is_direct { + offset = collect_encoded_buffers( + buffer, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } } else if matches!( data_type, DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _) @@ -2138,18 +2222,31 @@ fn write_array_data( DataType::LargeList(_) => get_list_array_buffers::(array_data), _ => unreachable!(), }; - offset = collect_encoded_buffers( - offsets, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; + if is_direct { + offset = collect_encoded_buffers( + offsets, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + offsets.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } offset = write_array_data( &sliced_child_data, buffers, + arrow_data, encoded_buffers, nodes, offset, @@ -2158,6 +2255,7 @@ fn write_array_data( compression_codec, compression_context, write_options, + is_direct, )?; return Ok(offset); } else if matches!( @@ -2173,29 +2271,50 @@ fn write_array_data( _ => unreachable!(), }; - offset = collect_encoded_buffers( - offsets, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - - offset = collect_encoded_buffers( - sizes, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; + if is_direct { + offset = collect_encoded_buffers( + offsets, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + offset = collect_encoded_buffers( + sizes, + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + offsets.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + offset = write_buffer( + sizes.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } offset = write_array_data( &child_data, buffers, + arrow_data, encoded_buffers, nodes, offset, @@ -2204,6 +2323,7 @@ fn write_array_data( compression_codec, compression_context, write_options, + is_direct, )?; return Ok(offset); } else if let DataType::FixedSizeList(_, fixed_size) = data_type { @@ -2217,6 +2337,7 @@ fn write_array_data( offset = write_array_data( &child_data, buffers, + arrow_data, encoded_buffers, nodes, offset, @@ -2225,19 +2346,32 @@ fn write_array_data( compression_codec, compression_context, write_options, + is_direct, )?; return Ok(offset); } else { for buffer in array_data.buffers() { - offset = collect_encoded_buffers( - buffer.clone(), - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; + if is_direct { + offset = collect_encoded_buffers( + buffer.clone(), + buffers, + encoded_buffers, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } else { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + } } } @@ -2246,10 +2380,13 @@ fn write_array_data( DataType::RunEndEncoded(_, _) => { // unslice the run encoded array. let arr = unslice_run_array(array_data.clone())?; + // recursively write out nested structures for data_ref in arr.child_data() { + // write the nested data (e.g list data) offset = write_array_data( data_ref, buffers, + arrow_data, encoded_buffers, nodes, offset, @@ -2258,14 +2395,18 @@ fn write_array_data( compression_codec, compression_context, write_options, + is_direct, )?; } } _ => { + // recursively write out nested structures for data_ref in array_data.child_data() { + // write the nested data (e.g list data) offset = write_array_data( data_ref, buffers, + arrow_data, encoded_buffers, nodes, offset, @@ -2274,6 +2415,7 @@ fn write_array_data( compression_codec, compression_context, write_options, + is_direct, )?; } } @@ -2281,20 +2423,62 @@ fn write_array_data( Ok(offset) } -/// Append a single array buffer to the outgoing IPC body and record its -/// corresponding `crate::Buffer` metadata into `buffers`. Returns the -/// updated byte `offset` after the buffer (including any padding). +/// Write a buffer into `arrow_data`, a vector of bytes, and adds its +/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data` +/// +/// +/// From +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. +fn write_buffer( + buffer: &[u8], // input + buffers: &mut Vec, // output buffer descriptors + arrow_data: &mut Vec, // output stream + offset: i64, // current output stream offset + compression_codec: Option, + compression_context: &mut CompressionContext, + alignment: u8, +) -> Result { + let len: i64 = match compression_codec { + Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?, + None => { + arrow_data.extend_from_slice(buffer); + buffer.len() + } + } + .try_into() + .map_err(|e| { + ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}")) + })?; + + // make new index entry + buffers.push(crate::Buffer::new(offset, len)); + // padding and make offset aligned + let pad_len = pad_to_alignment(alignment, len as usize); + arrow_data.extend_from_slice(&PADDING[..pad_len]); + + Ok(offset + len + (pad_len as i64)) +} + +/// Collect a single array buffer into `encoded_buffers` and record its +/// [`crate::Buffer`] metadata in `buffers`. Returns the updated byte offset +/// after the buffer (including padding). /// -/// See -/// for the on-wire layout: each buffer is optionally compressed and then -/// written as the 8-byte little-endian uncompressed length (or `-1` to -/// indicate uncompressed data) followed by the (compressed) bytes and -/// protocol-required padding. +/// Unlike [`write_buffer`], no bytes are written to a stream here — the +/// [`EncodedBuffer`] segments are accumulated and streamed to the output +/// writer *after* the FlatBuffer message header has been built. This lets +/// [`FileWriter`] and [`StreamWriter`] avoid the intermediate +/// `arrow_data: Vec` allocation used by the arrow-flight path. fn collect_encoded_buffers( - buffer: Buffer, // input array buffer to encode - buffers: &mut Vec, // IPC buffer metadata (offset + length) for the FlatBuffer message - encoded_buffers: &mut Vec, // accumulated encoded segments, written to output after the message header - offset: i64, // current output stream offset + buffer: Buffer, + buffers: &mut Vec, + encoded_buffers: &mut Vec, + offset: i64, compression_codec: Option, compression_context: &mut CompressionContext, alignment: u8, @@ -2314,7 +2498,6 @@ fn collect_encoded_buffers( } }; buffers.push(crate::Buffer::new(offset, len)); - // Defer the actual write. segments are streamed to the output after the FlatBuffer header is built. encoded_buffers.push(encoded); let pad_len = pad_to_alignment(alignment, len as usize) as i64; Ok(offset + len + pad_len) @@ -2322,6 +2505,77 @@ fn collect_encoded_buffers( const PADDING: [u8; 64] = [0; 64]; +/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`] +/// will produce for a column of the given type. +/// +/// Based on the Arrow IPC buffer layout +/// (): +/// +/// | Type family | Buffers | +/// |---|---| +/// | Null | 0 | +/// | Primitive / Bool / FixedSizeBinary | 2 (validity + values) | +/// | Binary / Utf8 / LargeBinary / LargeUtf8 | 3 (validity + offsets + data) | +/// | BinaryView / Utf8View | 3 estimate (validity + views + ≥1 variadic data buf) | +/// | List / LargeList / Map | 2 (validity + offsets) + child | +/// | ListView / LargeListView | 3 (validity + offsets + sizes) + child | +/// | FixedSizeList | 1 (validity) + child | +/// | Struct | 1 (validity) + Σ children | +/// | Dictionary | 2 (validity + indices); dict body is a separate IPC message | +/// | Union (sparse) | 1 (type_ids) + Σ children | +/// | Union (dense) | 2 (type_ids + offsets) + Σ children | +/// | RunEndEncoded | Σ of run-ends child + values child | +#[inline] +fn estimate_encoded_buffer_count(dt: &DataType) -> usize { + match dt { + DataType::Null => 0, + + DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3, + + DataType::BinaryView | DataType::Utf8View => 3, + + DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { + 2 + estimate_encoded_buffer_count(f.data_type()) + } + + DataType::ListView(f) | DataType::LargeListView(f) => { + 3 + estimate_encoded_buffer_count(f.data_type()) + } + + DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()), + + DataType::Struct(fields) => { + 1 + fields + .iter() + .map(|f| estimate_encoded_buffer_count(f.data_type())) + .sum::() + } + + // Dictionary indices only; dictionary body is a separate IPC message. + DataType::Dictionary(_, _) => 2, + + DataType::Union(fields, UnionMode::Sparse) => { + 1 + fields + .iter() + .map(|(_, f)| estimate_encoded_buffer_count(f.data_type())) + .sum::() + } + DataType::Union(fields, UnionMode::Dense) => { + 2 + fields + .iter() + .map(|(_, f)| estimate_encoded_buffer_count(f.data_type())) + .sum::() + } + + DataType::RunEndEncoded(run_ends, values) => { + estimate_encoded_buffer_count(run_ends.data_type()) + + estimate_encoded_buffer_count(values.data_type()) + } + // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values. + _ => 2, + } +} + /// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary #[inline] fn pad_to_alignment(alignment: u8, len: usize) -> usize { From 3ff94b0dd46ab88dc0efb3d9bd50e0745f18a26e Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 7 Jun 2026 16:13:23 -0400 Subject: [PATCH 10/15] remove minor metadata tracking overhead. Include extra return value from record_batch_to_bytes --- arrow-ipc/src/writer.rs | 91 ++++++++++------------------------------- 1 file changed, 22 insertions(+), 69 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index eb02c0852c2d..aeb6b2f2170a 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -113,57 +113,6 @@ struct IpcWriteMetadata { body_len: usize, } -/// Passes all writes through to `inner` while capturing the first 8 bytes of the stream -/// (the IPC continuation header) and counting total bytes written. Used by -/// [`IpcDataGenerator::write_direct`] to recover message sizes without an intermediate buffer. -struct SizeCapture<'a, W: Write> { - inner: &'a mut W, - total: usize, - header: [u8; 8], - header_len: usize, -} - -impl<'a, W: Write> SizeCapture<'a, W> { - fn new(writer: &'a mut W) -> Self { - Self { - inner: writer, - total: 0, - header: [0u8; 8], - header_len: 0, - } - } - - fn sizes(&self, legacy: bool) -> (usize, usize) { - // write_continuation writes: - // legacy: 4 bytes = total_len (= aligned_size - 4) - // non-legacy: 4 bytes marker + 4 bytes total_len (= aligned_size - 8) - let (prefix_size, msg_bytes): (usize, [u8; 4]) = if legacy { - (4, self.header[0..4].try_into().unwrap()) - } else { - (8, self.header[4..8].try_into().unwrap()) - }; - let meta = i32::from_le_bytes(msg_bytes) as usize + prefix_size; - let data = self.total - meta; - (meta, data) - } -} - -impl Write for SizeCapture<'_, W> { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let n = self.inner.write(buf)?; - if self.header_len < 8 { - let copy = (8 - self.header_len).min(n); - self.header[self.header_len..self.header_len + copy].copy_from_slice(&buf[..copy]); - self.header_len += copy; - } - self.total += n; - Ok(n) - } - fn flush(&mut self) -> std::io::Result<()> { - self.inner.flush() - } -} - impl IpcWriteOptions { /// Configures compression when writing IPC files. /// @@ -574,7 +523,7 @@ impl IpcDataGenerator { write_options, compression_context, )?; - let encoded_message = + let (encoded_message, _) = self.record_batch_to_bytes(batch, write_options, compression_context, None)?; Ok((encoded_dictionaries, encoded_message)) } @@ -627,14 +576,8 @@ impl IpcDataGenerator { dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?); } - let mut capture = SizeCapture::new(writer); - self.record_batch_to_bytes( - batch, - write_options, - compression_context, - Some(&mut capture), - )?; - let (padded_header_len, body_len) = capture.sizes(write_options.write_legacy_ipc_format); + let (_, (padded_header_len, body_len)) = + self.record_batch_to_bytes(batch, write_options, compression_context, Some(writer))?; Ok(IpcWriteMetadata { dictionary_block_sizes, @@ -667,13 +610,17 @@ impl IpcDataGenerator { /// When `writer` is `Some`, the message is written directly to it and an empty /// [`EncodedData`] is returned (the data has already been flushed). When `writer` /// is `None` the encoded bytes are returned for the caller to write. + /// + /// The second tuple element is `(padded_header_len, body_len)` — the padded flatbuffer header + /// size and total body size. These are populated when `writer` is `Some` (for block tracking + /// in [`FileWriter`]) and are `(0, 0)` otherwise. fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, writer: Option<&mut dyn Write>, - ) -> Result { + ) -> Result<(EncodedData, (usize, usize)), ArrowError> { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -779,17 +726,23 @@ impl IpcDataGenerator { w.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; } w.write_all(&PADDING[..tail_pad])?; - return Ok(EncodedData { - ipc_message: vec![], - arrow_data: vec![], - }); + return Ok(( + EncodedData { + ipc_message: vec![], + arrow_data: vec![], + }, + (aligned_size, body_len), + )); } arrow_data.extend_from_slice(&PADDING[..tail_pad]); - Ok(EncodedData { - ipc_message, - arrow_data, - }) + Ok(( + EncodedData { + ipc_message, + arrow_data, + }, + (0, 0), + )) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the From 8d1fc4d8ae796726a870c02f3a80abd8e1280e3c Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 7 Jun 2026 16:22:31 -0400 Subject: [PATCH 11/15] remove local benchmarks --- arrow-flight/benches/flight.rs | 34 +--------------------------------- arrow-flight/src/encode.rs | 1 + 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs index ba0ac593de4e..4841e9dd9822 100644 --- a/arrow-flight/benches/flight.rs +++ b/arrow-flight/benches/flight.rs @@ -83,37 +83,5 @@ fn bench_roundtrip(c: &mut Criterion) { } } -async fn encode_and_send(channel: Channel, batch: RecordBatch) { - let mut client = FlightClient::new(channel); - let frames = FlightDataEncoderBuilder::new().build(futures::stream::iter([Ok(batch)])); - client - .do_put(frames) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); -} - -fn bench_encode_to_send(c: &mut Criterion) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let (channel, _) = rt.block_on(start_server()); - let mut g = c.benchmark_group("encode_to_send"); - - for &(name, build) in TYPES { - for &rows in &ROWS { - for &cols in &COLS { - let batch = build_batch(name, rows, cols, build); - let id = BenchmarkId::new(name, format!("{rows}x{cols}")); - g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64)); - g.bench_with_input(id, &batch, |b, batch| { - b.to_async(&rt) - .iter(|| encode_and_send(channel.clone(), batch.clone())); - }); - } - } - } -} - -criterion_group!(benches, bench_encode, bench_roundtrip, bench_encode_to_send); +criterion_group!(benches, bench_encode, bench_roundtrip); criterion_main!(benches); diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 2ef70dca5da2..191da024136f 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -731,6 +731,7 @@ impl FlightIpcEncoder { &self.options, &mut self.compression_context, )?; + let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); let flight_batch = encoded_batch.into(); From ec99b7eeeb60ad01537cc9446eb47a442e010708 Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 7 Jun 2026 19:48:10 -0400 Subject: [PATCH 12/15] trigger CI From 095d7fde95f8a2054884d59c5804c6e342749aa0 Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 7 Jun 2026 19:48:30 -0400 Subject: [PATCH 13/15] trigger CI --- arrow-ipc/src/writer.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index aeb6b2f2170a..68a68992cc06 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -101,8 +101,7 @@ impl EncodedBuffer { /// Per-message sizes produced by [`IpcDataGenerator::write_direct`]. /// /// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for -/// random-access reads. Fields mirror those of [`crate::Block`]: a padded header length and a -/// body length for the record batch, plus one pair per dictionary written before it. +/// random-access reads. struct IpcWriteMetadata { /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written /// before the record batch. @@ -670,7 +669,6 @@ impl IpcDataGenerator { append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } - // Build FlatBuffer header — same for both paths. let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; @@ -709,7 +707,7 @@ impl IpcDataGenerator { let ipc_message = fbb.finished_data().to_vec(); if let Some(w) = writer { - // Stream header then each buffer directly — no intermediate Vec. + // Stream header then each buffer directly let a = usize::from(alignment - 1); let prefix_size = if write_options.write_legacy_ipc_format { 4 @@ -2422,11 +2420,9 @@ fn write_buffer( /// [`crate::Buffer`] metadata in `buffers`. Returns the updated byte offset /// after the buffer (including padding). /// -/// Unlike [`write_buffer`], no bytes are written to a stream here — the +/// Unlike [`write_buffer`], no bytes are written here. The /// [`EncodedBuffer`] segments are accumulated and streamed to the output -/// writer *after* the FlatBuffer message header has been built. This lets -/// [`FileWriter`] and [`StreamWriter`] avoid the intermediate -/// `arrow_data: Vec` allocation used by the arrow-flight path. +/// writer after the FlatBuffer message header has been built. fn collect_encoded_buffers( buffer: Buffer, buffers: &mut Vec, From a3f9c5382b537eb728893897950f05a3c371f1ae Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 7 Jun 2026 22:59:31 -0400 Subject: [PATCH 14/15] trigger CI From 70a75671080ad53dee3f167c0c27daf77d397ac7 Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 8 Jun 2026 13:55:22 -0400 Subject: [PATCH 15/15] removed un-needed code duplication in favor of an enum --- arrow-ipc/src/writer.rs | 574 +++++++++++++--------------------------- 1 file changed, 185 insertions(+), 389 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 68a68992cc06..a881d41c7d8d 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -97,6 +97,13 @@ impl EncodedBuffer { } } } +/// Destination for per-buffer encoded output produced by [`write_array_data`]. +enum BufferSink<'a> { + /// Serialize buffer bytes (with padding) into a contiguous byte vec. + Write(&'a mut Vec), + /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming. + Collect(&'a mut Vec), +} /// Per-message sizes produced by [`IpcDataGenerator::write_direct`]. /// @@ -522,9 +529,21 @@ impl IpcDataGenerator { write_options, compression_context, )?; - let (encoded_message, _) = - self.record_batch_to_bytes(batch, write_options, compression_context, None)?; - Ok((encoded_dictionaries, encoded_message)) + let mut arrow_data = Vec::new(); + let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( + batch, + write_options, + compression_context, + &mut BufferSink::Write(&mut arrow_data), + )?; + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + Ok(( + encoded_dictionaries, + EncodedData { + ipc_message, + arrow_data, + }, + )) } /// Encode dictionary batches for all columns in `batch`. @@ -575,12 +594,43 @@ impl IpcDataGenerator { dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?); } - let (_, (padded_header_len, body_len)) = - self.record_batch_to_bytes(batch, write_options, compression_context, Some(writer))?; + let capacity = batch + .columns() + .iter() + .map(|a| estimate_encoded_buffer_count(a.data_type())) + .sum(); + let mut encoded_buffers: Vec = Vec::with_capacity(capacity); + let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes( + batch, + write_options, + compression_context, + &mut BufferSink::Collect(&mut encoded_buffers), + )?; + + let alignment = write_options.alignment; + let a = usize::from(alignment - 1); + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; + let aligned_size = (ipc_message.len() + prefix_size + a) & !a; + write_continuation( + &mut *writer, + write_options, + (aligned_size - prefix_size) as i32, + )?; + writer.write_all(&ipc_message)?; + writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?; + for enc in &encoded_buffers { + writer.write_all(enc.as_slice())?; + writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; + } + writer.write_all(&PADDING[..tail_pad])?; Ok(IpcWriteMetadata { dictionary_block_sizes, - padded_header_len, + padded_header_len: aligned_size, body_len, }) } @@ -603,29 +653,20 @@ impl IpcDataGenerator { ) } - /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the - /// other for the batch's data. - /// - /// When `writer` is `Some`, the message is written directly to it and an empty - /// [`EncodedData`] is returned (the data has already been flushed). When `writer` - /// is `None` the encoded bytes are returned for the caller to write. - /// - /// The second tuple element is `(padded_header_len, body_len)` — the padded flatbuffer header - /// size and total body size. These are populated when `writer` is `Some` (for block tracking - /// in [`FileWriter`]) and are `(0, 0)` otherwise. + /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the + /// serialised buffer data. fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, - writer: Option<&mut dyn Write>, - ) -> Result<(EncodedData, (usize, usize)), ArrowError> { + sink: &mut BufferSink<'_>, + ) -> Result<(Vec, usize, usize), ArrowError> { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -641,22 +682,13 @@ impl IpcDataGenerator { let alignment = write_options.alignment; let mut variadic_buffer_counts = vec![]; let mut offset = 0i64; - let mut arrow_data: Vec = vec![]; - let capacity = batch - .columns() - .iter() - .map(|a| estimate_encoded_buffer_count(a.data_type())) - .sum(); - let mut encoded_buffers: Vec = Vec::with_capacity(capacity); - let is_direct = writer.is_some(); for array in batch.columns() { let array_data = array.to_data(); offset = write_array_data( &array_data, &mut buffers, - &mut arrow_data, - &mut encoded_buffers, + sink, &mut nodes, offset, array.len(), @@ -664,7 +696,6 @@ impl IpcDataGenerator { compression_codec, compression_context, write_options, - is_direct, )?; append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } @@ -672,9 +703,8 @@ impl IpcDataGenerator { let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; - // write data - let buffers_fb = fbb.create_vector(&buffers); - let nodes_fb = fbb.create_vector(&nodes); + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { None } else { @@ -684,17 +714,15 @@ impl IpcDataGenerator { let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes_fb); - batch_builder.add_buffers(buffers_fb); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); if let Some(c) = compression { batch_builder.add_compression(c); } - if let Some(v) = variadic_buffer { batch_builder.add_variadicBufferCounts(v); } - let b = batch_builder.finish(); - b.as_union_value() + batch_builder.finish().as_union_value() }; // create an crate::Message let mut message = crate::MessageBuilder::new(&mut fbb); @@ -704,43 +732,8 @@ impl IpcDataGenerator { message.add_header(root); let root = message.finish(); fbb.finish(root, None); - let ipc_message = fbb.finished_data().to_vec(); - if let Some(w) = writer { - // Stream header then each buffer directly - let a = usize::from(alignment - 1); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 - } else { - 8 - }; - let aligned_size = (ipc_message.len() + prefix_size + a) & !a; - let padding_bytes = aligned_size - ipc_message.len() - prefix_size; - write_continuation(&mut *w, write_options, (aligned_size - prefix_size) as i32)?; - w.write_all(&ipc_message)?; - w.write_all(&PADDING[..padding_bytes])?; - for enc in &encoded_buffers { - w.write_all(enc.as_slice())?; - w.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; - } - w.write_all(&PADDING[..tail_pad])?; - return Ok(( - EncodedData { - ipc_message: vec![], - arrow_data: vec![], - }, - (aligned_size, body_len), - )); - } - - arrow_data.extend_from_slice(&PADDING[..tail_pad]); - Ok(( - EncodedData { - ipc_message, - arrow_data, - }, - (0, 0), - )) + Ok((fbb.finished_data().to_vec(), body_len, tail_pad)) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the @@ -774,13 +767,11 @@ impl IpcDataGenerator { .transpose()?; let alignment = write_options.alignment; - let mut encoded_buffers: Vec = - Vec::with_capacity(estimate_encoded_buffer_count(array_data.data_type())); + let mut sink = BufferSink::Write(&mut arrow_data); let offset = write_array_data( array_data, &mut buffers, - &mut arrow_data, - &mut encoded_buffers, + &mut sink, &mut nodes, 0, array_data.len(), @@ -788,7 +779,6 @@ impl IpcDataGenerator { compression_codec, compression_context, write_options, - false, )?; let mut variadic_buffer_counts = vec![]; @@ -1940,8 +1930,7 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer { fn write_array_data( array_data: &ArrayData, buffers: &mut Vec, - arrow_data: &mut Vec, - encoded_buffers: &mut Vec, + sink: &mut BufferSink<'_>, nodes: &mut Vec, offset: i64, num_rows: usize, @@ -1949,7 +1938,6 @@ fn write_array_data( compression_codec: Option, compression_context: &mut CompressionContext, write_options: &IpcWriteOptions, - is_direct: bool, ) -> Result { let mut offset = offset; if !matches!(array_data.data_type(), DataType::Null) { @@ -1961,7 +1949,7 @@ fn write_array_data( } if has_validity_bitmap(array_data.data_type(), write_options) { // write null buffer if exists - let null_buffer: Buffer = match array_data.nulls() { + let null_buffer = match array_data.nulls() { None => { // create a buffer and fill it with valid bits let num_bytes = bit_util::ceil(num_rows, 8); @@ -1972,54 +1960,30 @@ fn write_array_data( Some(buffer) => buffer.inner().sliced(), }; - if is_direct { - offset = collect_encoded_buffers( - null_buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - null_buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + null_buffer, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; } let data_type = array_data.data_type(); if matches!(data_type, DataType::Binary | DataType::Utf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { - if is_direct { - offset = collect_encoded_buffers( - buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + buffer, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; } } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) { // Slicing the views buffer is safe and easy, @@ -2029,76 +1993,40 @@ fn write_array_data( // If users wants to "compact" the arrays prior to sending them over IPC, // they should consider the gc API suggested in #5513 let views = get_or_truncate_buffer(array_data); - if is_direct { - offset = collect_encoded_buffers( - views, + offset = encode_sink_buffer( + views, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + + for buffer in array_data.buffers().iter().skip(1) { + offset = encode_sink_buffer( + buffer.clone(), buffers, - encoded_buffers, + sink, offset, compression_codec, compression_context, write_options.alignment, )?; - } else { - offset = write_buffer( - views.as_slice(), + } + } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { + let (offsets, values) = get_byte_array_buffers::(array_data); + for buffer in [offsets, values] { + offset = encode_sink_buffer( + buffer, buffers, - arrow_data, + sink, offset, compression_codec, compression_context, write_options.alignment, )?; } - - for buffer in array_data.buffers().iter().skip(1) { - if is_direct { - offset = collect_encoded_buffers( - buffer.clone(), - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } - } - } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { - let (offsets, values) = get_byte_array_buffers::(array_data); - for buffer in [offsets, values] { - if is_direct { - offset = collect_encoded_buffers( - buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } - } } else if DataType::is_numeric(data_type) || DataType::is_temporal(data_type) || matches!( @@ -2110,27 +2038,15 @@ fn write_array_data( assert_eq!(array_data.buffers().len(), 1); let buffer = get_or_truncate_buffer(array_data); - if is_direct { - offset = collect_encoded_buffers( - buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + buffer, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; } else if matches!(data_type, DataType::Boolean) { // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes). // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around. @@ -2138,27 +2054,15 @@ fn write_array_data( let buffer = &array_data.buffers()[0]; let buffer = buffer.bit_slice(array_data.offset(), array_data.len()); - if is_direct { - offset = collect_encoded_buffers( - buffer, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + buffer, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; } else if matches!( data_type, DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _) @@ -2173,32 +2077,19 @@ fn write_array_data( DataType::LargeList(_) => get_list_array_buffers::(array_data), _ => unreachable!(), }; - if is_direct { - offset = collect_encoded_buffers( - offsets, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - offsets.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + offsets, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; offset = write_array_data( &sliced_child_data, buffers, - arrow_data, - encoded_buffers, + sink, nodes, offset, sliced_child_data.len(), @@ -2206,7 +2097,6 @@ fn write_array_data( compression_codec, compression_context, write_options, - is_direct, )?; return Ok(offset); } else if matches!( @@ -2222,51 +2112,29 @@ fn write_array_data( _ => unreachable!(), }; - if is_direct { - offset = collect_encoded_buffers( - offsets, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - offset = collect_encoded_buffers( - sizes, - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - offsets.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - offset = write_buffer( - sizes.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + offsets, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; + offset = encode_sink_buffer( + sizes, + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; offset = write_array_data( &child_data, buffers, - arrow_data, - encoded_buffers, + sink, nodes, offset, child_data.len(), @@ -2274,7 +2142,6 @@ fn write_array_data( compression_codec, compression_context, write_options, - is_direct, )?; return Ok(offset); } else if let DataType::FixedSizeList(_, fixed_size) = data_type { @@ -2288,8 +2155,7 @@ fn write_array_data( offset = write_array_data( &child_data, buffers, - arrow_data, - encoded_buffers, + sink, nodes, offset, child_data.len(), @@ -2297,32 +2163,19 @@ fn write_array_data( compression_codec, compression_context, write_options, - is_direct, )?; return Ok(offset); } else { for buffer in array_data.buffers() { - if is_direct { - offset = collect_encoded_buffers( - buffer.clone(), - buffers, - encoded_buffers, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } else { - offset = write_buffer( - buffer.as_slice(), - buffers, - arrow_data, - offset, - compression_codec, - compression_context, - write_options.alignment, - )?; - } + offset = encode_sink_buffer( + buffer.clone(), + buffers, + sink, + offset, + compression_codec, + compression_context, + write_options.alignment, + )?; } } @@ -2337,8 +2190,7 @@ fn write_array_data( offset = write_array_data( data_ref, buffers, - arrow_data, - encoded_buffers, + sink, nodes, offset, data_ref.len(), @@ -2346,7 +2198,6 @@ fn write_array_data( compression_codec, compression_context, write_options, - is_direct, )?; } } @@ -2357,8 +2208,7 @@ fn write_array_data( offset = write_array_data( data_ref, buffers, - arrow_data, - encoded_buffers, + sink, nodes, offset, data_ref.len(), @@ -2366,7 +2216,6 @@ fn write_array_data( compression_codec, compression_context, write_options, - is_direct, )?; } } @@ -2374,59 +2223,10 @@ fn write_array_data( Ok(offset) } -/// Write a buffer into `arrow_data`, a vector of bytes, and adds its -/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data` -/// -/// -/// From -/// Each constituent buffer is first compressed with the indicated -/// compressor, and then written with the uncompressed length in the first 8 -/// bytes as a 64-bit little-endian signed integer followed by the compressed -/// buffer bytes (and then padding as required by the protocol). The -/// uncompressed length may be set to -1 to indicate that the data that -/// follows is not compressed, which can be useful for cases where -/// compression does not yield appreciable savings. -fn write_buffer( - buffer: &[u8], // input - buffers: &mut Vec, // output buffer descriptors - arrow_data: &mut Vec, // output stream - offset: i64, // current output stream offset - compression_codec: Option, - compression_context: &mut CompressionContext, - alignment: u8, -) -> Result { - let len: i64 = match compression_codec { - Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?, - None => { - arrow_data.extend_from_slice(buffer); - buffer.len() - } - } - .try_into() - .map_err(|e| { - ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}")) - })?; - - // make new index entry - buffers.push(crate::Buffer::new(offset, len)); - // padding and make offset aligned - let pad_len = pad_to_alignment(alignment, len as usize); - arrow_data.extend_from_slice(&PADDING[..pad_len]); - - Ok(offset + len + (pad_len as i64)) -} - -/// Collect a single array buffer into `encoded_buffers` and record its -/// [`crate::Buffer`] metadata in `buffers`. Returns the updated byte offset -/// after the buffer (including padding). -/// -/// Unlike [`write_buffer`], no bytes are written here. The -/// [`EncodedBuffer`] segments are accumulated and streamed to the output -/// writer after the FlatBuffer message header has been built. -fn collect_encoded_buffers( +fn encode_sink_buffer( buffer: Buffer, buffers: &mut Vec, - encoded_buffers: &mut Vec, + sink: &mut BufferSink<'_>, offset: i64, compression_codec: Option, compression_context: &mut CompressionContext, @@ -2446,10 +2246,21 @@ fn collect_encoded_buffers( (EncodedBuffer::Compressed(scratch), len) } }; + + let pad_len = pad_to_alignment(alignment, len as usize); + match sink { + BufferSink::Write(arrow_data) => { + match &encoded { + EncodedBuffer::Raw(b) => arrow_data.extend_from_slice(b.as_slice()), + EncodedBuffer::Compressed(v) => arrow_data.extend_from_slice(v), + } + arrow_data.extend_from_slice(&PADDING[..pad_len]); + } + BufferSink::Collect(encoded_buffers) => encoded_buffers.push(encoded), + } + buffers.push(crate::Buffer::new(offset, len)); - encoded_buffers.push(encoded); - let pad_len = pad_to_alignment(alignment, len as usize) as i64; - Ok(offset + len + pad_len) + Ok(offset + len + pad_len as i64) } const PADDING: [u8; 64] = [0; 64]; @@ -2459,21 +2270,6 @@ const PADDING: [u8; 64] = [0; 64]; /// /// Based on the Arrow IPC buffer layout /// (): -/// -/// | Type family | Buffers | -/// |---|---| -/// | Null | 0 | -/// | Primitive / Bool / FixedSizeBinary | 2 (validity + values) | -/// | Binary / Utf8 / LargeBinary / LargeUtf8 | 3 (validity + offsets + data) | -/// | BinaryView / Utf8View | 3 estimate (validity + views + ≥1 variadic data buf) | -/// | List / LargeList / Map | 2 (validity + offsets) + child | -/// | ListView / LargeListView | 3 (validity + offsets + sizes) + child | -/// | FixedSizeList | 1 (validity) + child | -/// | Struct | 1 (validity) + Σ children | -/// | Dictionary | 2 (validity + indices); dict body is a separate IPC message | -/// | Union (sparse) | 1 (type_ids) + Σ children | -/// | Union (dense) | 2 (type_ids + offsets) + Σ children | -/// | RunEndEncoded | Σ of run-ends child + values child | #[inline] fn estimate_encoded_buffer_count(dt: &DataType) -> usize { match dt {