Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 287 additions & 15 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection};
use crate::take::take_record_batch;
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
Expand Down Expand Up @@ -212,7 +212,7 @@ impl BatchCoalescer {
/// Push a batch into the Coalescer after applying a filter
///
/// This is semantically equivalent of calling [`Self::push_batch`]
/// with the results from [`filter_record_batch`]
/// with the results from [`crate::filter::filter_record_batch`]
///
/// # Example
/// ```
Expand All @@ -238,10 +238,7 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
self.push_batch(filtered_batch)
self.push_batch_with_filtered_columns(batch, filter)
}

/// Push a batch into the Coalescer after applying a set of indices
Expand Down Expand Up @@ -566,6 +563,79 @@ impl BatchCoalescer {
}
}

impl BatchCoalescer {
fn push_batch_with_filtered_columns(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
if filter.len() > batch.num_rows() {
return Err(ArrowError::InvalidArgumentError(format!(
"Filter predicate of length {} is larger than target array of length {}",
filter.len(),
batch.num_rows()
)));
}

let mut filter_builder = FilterBuilder::new(filter);
if batch.num_columns() > 1
|| (batch.num_columns() > 0
&& FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type()))
{
filter_builder = filter_builder.optimize();
}
let predicate = filter_builder.build();
let selected_count = predicate.count();

if selected_count == 0 {
return Ok(());
}

if selected_count == batch.num_rows() && filter.len() == batch.num_rows() {
return self.push_batch(batch);
}

let exceeds_coalesce_limit = self
.biggest_coalesce_batch_size
.is_some_and(|limit| selected_count > limit);
// Multi-column batches benefit from sharing the selection across
// columns; single-column batches need a sparser filter to win.
let is_dense_filter = if batch.num_columns() > 1 {
selected_count.saturating_mul(4) > filter.len()
} else {
selected_count.saturating_mul(16) > filter.len()
};
let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows;

if exceeds_coalesce_limit || is_dense_filter || does_not_fit_buffer {
// Use materialized filtering when sparse per-column copying won't help.
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let (_schema, arrays, _num_rows) = batch.into_parts();

if arrays.len() != self.in_progress_arrays.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Batch has {} columns but BatchCoalescer expects {}",
arrays.len(),
self.in_progress_arrays.len()
)));
}

for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(arrays) {
in_progress.copy_rows_by_filter_from(array, &predicate)?;
}

self.buffered_rows += selected_count;
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}

Ok(())
}
}

/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
macro_rules! instantiate_primitive {
Expand Down Expand Up @@ -611,6 +681,35 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;

/// Copy rows selected by `filter` from the current source array.
fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
self.copy_rows_by_selection(filter.selection())
}

/// Copy rows selected by `filter` from `source`.
fn copy_rows_by_filter_from(
&mut self,
source: ArrayRef,
filter: &FilterPredicate,
) -> Result<(), ArrowError> {
self.set_source(Some(source));
let result = self.copy_rows_by_filter(filter);
self.set_source(None);
result
}

/// Copy rows described by a [`FilterSelection`] from the current source array.
fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> {
match selection {
FilterSelection::None => Ok(()),
FilterSelection::All { len } => self.copy_rows(0, len),
FilterSelection::Slices(slices) => {
slices.try_for_each(|(start, end)| self.copy_rows(start, end - start))
}
FilterSelection::Indices(indices) => indices.try_for_each(|idx| self.copy_rows(idx, 1)),
}
}

/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
Expand All @@ -619,6 +718,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
mod tests {
use super::*;
use crate::concat::concat_batches;
use crate::filter::filter_record_batch;
use arrow_array::builder::StringViewBuilder;
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
Expand Down Expand Up @@ -1197,6 +1297,172 @@ mod tests {
.run();
}

#[test]
fn test_binary_view_filtered() {
let values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"A longer string that is more than 12 bytes"),
];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = sparse_filter(1000);

Test::new("coalesce_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(256)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_binary_view_filtered_inline() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = sparse_filter(1000);

Test::new("coalesce_binary_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_string_view_filtered_inline() {
let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];

let string_view =
StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
let filter = sparse_filter(1000);

Test::new("coalesce_string_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_mixed_inline_binary_view_filtered() {
let int_values =
Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
let binary_view = BinaryViewArray::from_iter(
std::iter::repeat(binary_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("b", Arc::new(binary_view) as ArrayRef),
])
.unwrap();

let filter = sparse_filter(1000);

Test::new("coalesce_mixed_inline_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_mixed_inline_string_view_filtered() {
let int_values =
Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
let string_view = StringViewArray::from_iter(
std::iter::repeat(string_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("s", Arc::new(string_view) as ArrayRef),
])
.unwrap();

let filter = sparse_filter(1000);

Test::new("coalesce_mixed_inline_string_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_mixed_boolean_inline_string_view_filtered() {
let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 == 0)));
let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
let string_view = StringViewArray::from_iter(
std::iter::repeat(string_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("b", Arc::new(bool_values) as ArrayRef),
("s", Arc::new(string_view) as ArrayRef),
])
.unwrap();

let filter = sparse_filter(1000);

Test::new("coalesce_mixed_boolean_inline_string_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_inline_filter_rejects_filter_longer_than_batch() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), Some(b"bar")];
let binary_view = BinaryViewArray::from_iter(values);
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from(vec![true, false, true]);

let mut coalescer = BatchCoalescer::new(batch.schema(), 100);
let result = coalescer.push_batch_with_filter(batch, &filter);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("Filter predicate of length 3 is larger than target array of length 2"),
"unexpected error: {err}"
);
}

#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
Expand Down Expand Up @@ -1685,6 +1951,10 @@ mod tests {
}
}

fn sparse_filter(len: usize) -> BooleanArray {
BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0)))
}

/// Returns the named column as a StringViewArray
fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
batch
Expand All @@ -1701,18 +1971,20 @@ mod tests {
let (schema, mut columns, row_count) = batch.into_parts();

for column in columns.iter_mut() {
let Some(string_view) = column.as_string_view_opt() else {
if let Some(string_view) = column.as_string_view_opt() {
// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
}
*column = Arc::new(builder.finish());
continue;
};
}

// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
if let Some(binary_view) = column.as_binary_view_opt() {
*column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
}
// Update the column with the new StringViewArray
*column = Arc::new(builder.finish());
}

let options = RecordBatchOptions::new().with_row_count(Some(row_count));
Expand Down
Loading
Loading