Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions datafusion/functions/src/string/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,18 @@ fn case_conversion(
if let Some(ref n) = nulls {
for i in 0..item_len {
if n.is_null(i) {
builder.append_placeholder();
builder.try_append_placeholder()?;
} else {
// SAFETY: `n.is_null(i)` was false in the branch above.
let s = unsafe { string_array.value_unchecked(i) };
builder.append_value(&unicode_case(s, lower));
builder.try_append_value(&unicode_case(s, lower))?;
}
}
} else {
for i in 0..item_len {
// SAFETY: no null buffer means every index is valid.
let s = unsafe { string_array.value_unchecked(i) };
builder.append_value(&unicode_case(s, lower));
builder.try_append_value(&unicode_case(s, lower))?;
}
}

Expand Down Expand Up @@ -431,18 +431,18 @@ fn case_conversion_array<O: OffsetSizeTrait>(
if let Some(ref n) = nulls {
for i in 0..item_len {
if n.is_null(i) {
builder.append_placeholder();
builder.try_append_placeholder()?;
} else {
// SAFETY: `n.is_null(i)` was false in the branch above.
let s = unsafe { string_array.value_unchecked(i) };
builder.append_value(&unicode_case(s, lower));
builder.try_append_value(&unicode_case(s, lower))?;
}
}
} else {
for i in 0..item_len {
// SAFETY: no null buffer means every index is valid.
let s = unsafe { string_array.value_unchecked(i) };
builder.append_value(&unicode_case(s, lower));
builder.try_append_value(&unicode_case(s, lower))?;
}
}
Ok(Arc::new(builder.finish(nulls)?))
Expand Down
138 changes: 105 additions & 33 deletions datafusion/functions/src/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ fn offset_overflow_error<O: OffsetSizeTrait>() -> DataFusionError {
)
}

fn string_view_overflow_error(field: &str) -> DataFusionError {
exec_datafusion_err!("byte array offset overflow: {field} exceeds i32::MAX")
}

fn try_offset<O: OffsetSizeTrait>(len: usize) -> Result<O> {
if len > O::MAX_OFFSET {
return Err(offset_overflow_error::<O>());
Expand Down Expand Up @@ -684,57 +688,90 @@ impl StringViewArrayBuilder {
self.block_size
}

/// See [`BulkNullStringArrayBuilder::append_value`].
/// Fallible variant of [`Self::append_value`].
///
/// # Panics
/// # Errors
///
/// Panics if the value length, the in-progress buffer offset, or the
/// number of completed buffers exceeds `i32::MAX`. The ByteView spec
/// uses signed 32-bit integers for these fields; exceeding `i32::MAX`
/// would produce an array that does not round-trip through Arrow IPC
/// (see <https://github.com/apache/arrow-rs/issues/6172>).
/// Returns an error if the value length, in-progress buffer offset, or
/// number of completed buffers exceeds `i32::MAX`. The ByteView spec uses
/// signed 32-bit integers for these fields; exceeding `i32::MAX` would
/// produce an array that does not round-trip through Arrow IPC (see
/// <https://github.com/apache/arrow-rs/issues/6172>).
#[inline]
pub fn append_value(&mut self, value: &str) {
pub fn try_append_value(&mut self, value: &str) -> Result<()> {
let v = value.as_bytes();
let length: u32 =
i32::try_from(v.len()).expect("value length exceeds i32::MAX") as u32;
let length: u32 = i32::try_from(v.len())
.map_err(|_| string_view_overflow_error("value length"))?
as u32;
if length <= 12 {
self.views.push(make_view(v, 0, 0));
return;
return Ok(());
}

let required_cap = self.in_progress.len() + length as usize;
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = (length as usize).max(self.next_block_size() as usize);
#[expect(
clippy::disallowed_methods,
reason = "StringView's block size bounds growth, so reserve cannot overflow capacity arithmetically. This hot loop intentionally avoids the extra `try_reserve` checks. It remains subject to allocator failure/OOM, which must be managed externally."
)]
self.in_progress.reserve(to_reserve);
}
self.try_ensure_long_capacity(length)?;

let offset: u32 = i32::try_from(self.in_progress.len())
.expect("offset exceeds i32::MAX") as u32;
.map_err(|_| string_view_overflow_error("offset"))?
as u32;
let buffer_index: u32 = i32::try_from(self.completed.len())
.map_err(|_| string_view_overflow_error("buffer count"))?
as u32;
self.in_progress.extend_from_slice(v);
self.views.push(self.make_long_view(length, offset, v));
self.views.push(Self::make_long_view_checked(
length,
buffer_index,
offset,
v,
));
Ok(())
}

/// See [`BulkNullStringArrayBuilder::append_value`].
///
/// Note: new call sites that need recoverable overflow handling should
/// prefer [`Self::try_append_value`].
///
/// # Panics
///
/// Panics under the same conditions that [`Self::try_append_value`] returns
/// an error.
#[inline]
pub fn append_value(&mut self, value: &str) {
self.try_append_value(value)
.expect("byte array offset overflow");
}

/// Fallible variant of [`Self::append_placeholder`].
///
/// # Errors
///
/// This currently cannot fail; it returns `Result` for API symmetry with
/// other fallible append methods.
#[inline]
pub fn try_append_placeholder(&mut self) -> Result<()> {
self.append_placeholder();
Ok(())
}

/// See [`BulkNullStringArrayBuilder::append_placeholder`].
///
/// Note: new call sites that need recoverable overflow handling should
/// prefer [`Self::try_append_placeholder`].
#[inline]
pub fn append_placeholder(&mut self) {
// Zero-length inline view — `length` field is 0, no buffer ref.
self.views.push(0);
self.placeholder_count += 1;
}

/// Ensure the in-progress block has room for `length` more bytes,
/// flushing the current block and starting a new (doubled) one if not.
/// Caller must invoke this only when no bytes of the current row are
/// yet in `in_progress` — flushing mid-row would orphan partial data.
/// Fallible variant of [`Self::ensure_long_capacity`].
#[inline]
fn ensure_long_capacity(&mut self, length: u32) {
let required_cap = self.in_progress.len() + length as usize;
fn try_ensure_long_capacity(&mut self, length: u32) -> Result<()> {
let required_cap = self
.in_progress
.len()
.checked_add(length as usize)
.ok_or_else(|| string_view_overflow_error("string view block size"))?;
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = (length as usize).max(self.next_block_size() as usize);
Expand All @@ -744,6 +781,17 @@ impl StringViewArrayBuilder {
)]
self.in_progress.reserve(to_reserve);
}
Ok(())
}

/// Ensure the in-progress block has room for `length` more bytes,
/// flushing the current block and starting a new (doubled) one if not.
/// Caller must invoke this only when no bytes of the current row are
/// yet in `in_progress` — flushing mid-row would orphan partial data.
#[inline]
fn ensure_long_capacity(&mut self, length: u32) {
self.try_ensure_long_capacity(length)
.expect("byte array offset overflow");
}

/// Encode a long-form view referencing `length` bytes already written
Expand All @@ -754,10 +802,12 @@ impl StringViewArrayBuilder {
/// function is `[inline(never)]` and has to handle short strings, so
/// building the view here ourselves is faster.
#[inline]
fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) -> u128 {
let buffer_index: u32 = i32::try_from(self.completed.len())
.expect("buffer count exceeds i32::MAX")
as u32;
fn make_long_view_checked(
length: u32,
buffer_index: u32,
offset: u32,
prefix_bytes: &[u8],
) -> u128 {
ByteView {
length,
// length > 12, so prefix_bytes has at least 4 bytes.
Expand All @@ -768,6 +818,14 @@ impl StringViewArrayBuilder {
.into()
}

#[inline]
fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) -> u128 {
let buffer_index: u32 = i32::try_from(self.completed.len())
.expect("buffer count exceeds i32::MAX")
as u32;
Self::make_long_view_checked(length, buffer_index, offset, prefix_bytes)
}

/// See [`BulkNullStringArrayBuilder::append_byte_map`].
///
/// # Safety
Expand Down Expand Up @@ -1601,6 +1659,20 @@ mod tests {
);
}

#[test]
fn string_view_builder_try_append_success_path() {
let mut builder = StringViewArrayBuilder::with_capacity(3);
builder.try_append_value("abc").unwrap();
builder.try_append_placeholder().unwrap();
builder.try_append_value("a long string value").unwrap();

let nulls = Some(NullBuffer::from(vec![true, false, true]));
let array = builder.finish(nulls).unwrap();
assert_eq!(array.value(0), "abc");
assert!(array.is_null(1));
assert_eq!(array.value(2), "a long string value");
}

#[test]
fn generic_string_builder_try_offset_overflow() {
let err = try_offset::<i32>(i32::MAX as usize + 1)
Expand Down
29 changes: 25 additions & 4 deletions datafusion/functions/src/unicode/substrindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,15 @@ where

for i in 0..num_rows {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
builder.append_placeholder();
builder.try_append_placeholder()?;
continue;
}
// SAFETY: `i < num_rows` and the union of input nulls is valid at i,
// so each input is also valid at i.
let string = unsafe { string_array.value_unchecked(i) };
let delimiter = unsafe { delimiter_array.value_unchecked(i) };
let n = unsafe { count_array.value_unchecked(i) };
builder.append_value(substr_index_slice(string, delimiter, n));
builder.try_append_value(substr_index_slice(string, delimiter, n))?;
}

Ok(Arc::new(builder.finish(nulls)?) as ArrayRef)
Expand Down Expand Up @@ -487,13 +487,13 @@ where
let nulls = string_array.nulls().cloned();
for i in 0..string_array.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
builder.append_placeholder();
builder.try_append_placeholder()?;
continue;
}
// SAFETY: `i < string_array.len()` and `nulls` is valid at i, so the
// input is also valid at i.
let s = unsafe { string_array.value_unchecked(i) };
builder.append_value(f(s));
builder.try_append_value(f(s))?;
}
Ok(Arc::new(builder.finish(nulls)?) as ArrayRef)
}
Expand Down Expand Up @@ -797,6 +797,27 @@ mod tests {
Ok(())
}

#[test]
fn test_substr_index_all_nulls() -> Result<()> {
use super::substr_index_general;
use crate::strings::GenericStringArrayBuilder;

let strings = StringArray::from(vec![None::<&str>, None]);
let delimiters = StringArray::from(vec![None::<&str>, Some(".")]);
let counts = Int64Array::from(vec![None, None]);

let result = substr_index_general(
&strings,
&delimiters,
&counts,
GenericStringArrayBuilder::<i32>::with_capacity(strings.len(), 0),
)?;
let result = result.as_string::<i32>();
assert_eq!(result, &StringArray::from(vec![None::<&str>, None]));

Ok(())
}

#[test]
fn test_substr_index_utf8view_array_sliced() -> Result<()> {
use super::substr_index_view;
Expand Down