diff --git a/include/lance/lance.h b/include/lance/lance.h index d47e128..fdd01a0 100644 --- a/include/lance/lance.h +++ b/include/lance/lance.h @@ -559,6 +559,114 @@ int32_t lance_dataset_alter_columns( size_t num_alterations ); +/* ─── lance_dataset_add_columns ───────────────────────────────────────────── */ + +/** + * A single new column defined by a SQL expression over the dataset's existing + * columns, e.g. { .name = "doubled", .expression = "x * 2" }. Both fields are + * required, non-empty UTF-8, and are read by shared reference for the duration + * of the call. + */ +typedef struct LanceSqlColumn { + /* Name of the new column. Required, non-empty UTF-8. */ + const char* name; + /* SQL expression evaluated against existing columns. Required, non-empty. */ + const char* expression; +} LanceSqlColumn; + +/** + * Add one or more columns computed from SQL expressions over the dataset's + * existing columns, committing a new manifest. Each fragment is scanned, the + * expressions are evaluated, and the results are written as new column files. + * + * Mutates `dataset` in place — the same handle remains valid afterward and + * sees the new version. Scanners already in flight keep their pre-add view. + * + * @param dataset Open dataset (not consumed). Mutated in place. Must not + * be NULL. + * @param columns Array of `LanceSqlColumn`. Must not be NULL; each entry's + * `name` and `expression` must be non-NULL and non-empty. + * @param num_columns Length of `columns`. Must be > 0. + * @param batch_size Rows per scan batch while evaluating expressions. + * 0 = upstream default. + * @return 0 on success, -1 on error. Error codes: + * LANCE_ERR_INVALID_ARGUMENT for NULL/empty inputs, NULL or empty + * `name` / `expression`, non-UTF-8 strings, malformed SQL *syntax*, a + * new column name that collides with an existing column, or a + * `batch_size` beyond UINT32_MAX. An expression that references a + * *non-existent column* surfaces as LANCE_ERR_INTERNAL (an upstream + * schema error, the same path as lance_dataset_delete), not + * LANCE_ERR_INVALID_ARGUMENT. LANCE_ERR_COMMIT_CONFLICT for a + * concurrent writer. + */ +int32_t lance_dataset_add_columns_sql( + LanceDataset* dataset, + const LanceSqlColumn* columns, + size_t num_columns, + uint64_t batch_size +); + +/** + * Add one or more all-null columns described by an Arrow C Data Interface + * schema, committing a new manifest. On non-legacy datasets this is a + * metadata-only operation — no data files are rewritten. Every field in the + * schema must be nullable. + * + * Mutates `dataset` in place — the same handle remains valid afterward and + * sees the new version. Scanners already in flight keep their pre-add view. + * + * @param dataset Open dataset (not consumed). Mutated in place. Must not be + * NULL. + * @param schema Arrow C `ArrowSchema` describing the new columns. Read by + * shared reference; its `release` callback is never invoked. + * Must not be NULL. Only the top-level schema is validated + * before it is handed to arrow-rs; the caller is responsible for + * providing fully-initialised child fields. + * @return 0 on success, -1 on error. Error codes: + * LANCE_ERR_INVALID_ARGUMENT for a NULL dataset/schema, an + * uninitialised or already-released schema, an invalid Arrow schema, a + * non-nullable field, or a name that collides with an existing column. + * LANCE_ERR_NOT_SUPPORTED for a legacy-format dataset (which cannot take + * all-null columns as a metadata-only change). + * LANCE_ERR_COMMIT_CONFLICT for a concurrent writer. + */ +int32_t lance_dataset_add_columns_nulls( + LanceDataset* dataset, + const struct ArrowSchema* schema +); + +/** + * Add columns by splicing precomputed data from an Arrow C Data Interface + * stream into the dataset, committing a new manifest. The stream's batches are + * consumed in order and aligned positionally to the dataset's existing rows; + * the total row count must match the dataset exactly. + * + * Mutates `dataset` in place — the same handle remains valid afterward and + * sees the new version. Scanners already in flight keep their pre-add view. + * + * @param dataset Open dataset (not consumed). Mutated in place. Must not + * be NULL. + * @param stream Arrow C stream of new column data. When non-NULL it is + * consumed (released) on every return path, including error + * returns — the caller must not use it again. (A NULL stream + * is rejected before anything is consumed.) Its schema + * defines the new columns and must not collide with existing + * column names. + * @param batch_size Rows per write batch while aligning the stream to + * fragments. 0 = upstream default. + * @return 0 on success, -1 on error. Error codes: + * LANCE_ERR_INVALID_ARGUMENT for a NULL dataset/stream, a stream missing + * a mandatory get_schema/get_next/release callback, a stream whose total + * row count does not match the dataset, a new column name that collides + * with an existing column, or a `batch_size` beyond UINT32_MAX. + * LANCE_ERR_COMMIT_CONFLICT for a concurrent writer. + */ +int32_t lance_dataset_add_columns_stream( + LanceDataset* dataset, + struct ArrowArrayStream* stream, + uint64_t batch_size +); + /** * Export the dataset schema via Arrow C Data Interface. * @param out Pointer to caller-allocated ArrowSchema struct diff --git a/include/lance/lance.hpp b/include/lance/lance.hpp index 01125ed..ad17789 100644 --- a/include/lance/lance.hpp +++ b/include/lance/lance.hpp @@ -136,6 +136,16 @@ struct ColumnAlteration { const ArrowSchema* data_type = nullptr; }; +// ─── New column (SQL) ──────────────────────────────────────────────────────── + +/// A single new column defined by a SQL expression over the dataset's existing +/// columns, added by `Dataset::add_columns_sql`. Both fields are required and +/// non-empty, e.g. `{ "doubled", "x * 2" }`. +struct SqlColumn { + std::string name; + std::string expression; +}; + // ─── Dataset ───────────────────────────────────────────────────────────────── class Dataset { @@ -511,6 +521,78 @@ class Dataset { } } + /// Add columns computed from SQL expressions over the dataset's existing + /// columns, committing a new manifest. `batch_size = 0` uses the upstream + /// default scan batch size. + /// + /// `columns` must be non-empty and each entry's `name` and `expression` + /// must be non-empty. Throws lance::Error on failure (empty list, empty + /// name/expression, malformed SQL syntax, name collision with an existing + /// column, commit conflict, ...). A reference to a non-existent column + /// throws with code `LANCE_ERR_INTERNAL` (an upstream schema error), not + /// `LANCE_ERR_INVALID_ARGUMENT` — see the C header for the rationale. + void add_columns_sql(const std::vector& columns, + uint64_t batch_size = 0) { + // The C strings we install in each entry borrow from `columns` (the + // caller's std::strings), which outlive this call. The entries are + // copied by value into `raw`, so any reallocation during push_back + // just moves the raw bytes — pointer values are preserved. + std::vector raw; + raw.reserve(columns.size()); + for (const auto& c : columns) { + LanceSqlColumn entry{}; + entry.name = c.name.c_str(); + entry.expression = c.expression.c_str(); + raw.push_back(entry); + } + // Pass `raw.data()` unconditionally — matches the `alter_columns` and + // `drop_columns` siblings whose inputs are also required to be + // non-empty. An empty `columns` yields `num_columns == 0`, which the + // Rust layer rejects before it indexes the pointer. + if (lance_dataset_add_columns_sql( + handle_.get(), raw.data(), raw.size(), batch_size) != 0) { + check_error(); + } + } + + /// Add all-null columns described by an Arrow schema, committing a new + /// manifest. Metadata-only on non-legacy datasets. Every field in `schema` + /// must be nullable. The caller owns `schema` and must keep it alive for + /// the duration of the call; the wrapper does not release it. + /// + /// Throws lance::Error on failure (invalid schema, non-nullable field, name + /// collision with an existing column, commit conflict, ...). A legacy-format + /// dataset throws with code `LANCE_ERR_NOT_SUPPORTED` (all-null columns are + /// metadata-only and the legacy format cannot represent them that way). + void add_columns_nulls(const ArrowSchema* schema) { + if (lance_dataset_add_columns_nulls(handle_.get(), schema) != 0) { + check_error(); + } + } + + /// Add columns by splicing precomputed data from an Arrow C stream into the + /// dataset, committing a new manifest. `batch_size = 0` uses the upstream + /// default. When non-null, `stream` is consumed (released) on every return + /// path — including a null-dataset error and when this method throws — so do + /// not use it again afterward. Only a null `stream` is rejected without + /// consuming anything. + /// + /// The stream's total row count must match the dataset exactly. Throws + /// lance::Error on failure (row-count mismatch, name collision with an + /// existing column, commit conflict, ...). + void add_columns_stream(ArrowArrayStream* stream, uint64_t batch_size = 0) { + // Forward `stream` straight to the C API, which owns the stream and + // releases it on every path. No RAII guard is needed here (unlike + // `write`, which builds vectors before its C call): nothing between this + // method's entry and the call below can throw, so the stream can never + // be stranded by an exception. WARNING: do not add any throwing code + // before the C call without first arming a stream-release guard. + if (lance_dataset_add_columns_stream( + handle_.get(), stream, batch_size) != 0) { + check_error(); + } + } + /// Export the schema as an Arrow C Data Interface struct. void schema(ArrowSchema* out) const { if (lance_dataset_schema(handle_.get(), out) != 0) { diff --git a/src/add_columns.rs b/src/add_columns.rs new file mode 100644 index 0000000..00d58f6 --- /dev/null +++ b/src/add_columns.rs @@ -0,0 +1,391 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Add columns C API: append new columns to a dataset, committing a new +//! manifest. Three variants mirror the upstream `NewColumnTransform` cases +//! that translate cleanly across the C ABI: +//! +//! * `..._sql` — derive columns from SQL expressions over existing columns. +//! * `..._nulls` — add all-null columns described by an Arrow schema; this is +//! a metadata-only operation on non-legacy datasets. +//! * `..._stream` — splice in precomputed column data from an Arrow C stream, +//! aligned to the dataset's existing rows in order. +//! +//! The upstream `BatchUDF` variant is intentionally omitted: it carries a Rust +//! closure that cannot cross the C ABI. The `_stream` variant covers the same +//! "bring your own computed data" use case. +//! +//! Each call mutates the dataset in place under an exclusive write lock; +//! existing scanners that already cloned the inner Arc keep their pre-add view. + +use std::ffi::c_char; +use std::sync::Arc; + +use arrow::ffi::FFI_ArrowSchema; +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use arrow_schema::Schema as ArrowSchema; +use lance::dataset::NewColumnTransform; +use lance_core::Result; +use snafu::location; + +use crate::dataset::LanceDataset; +use crate::error::ffi_try; +use crate::helpers; +use crate::runtime::block_on; + +/// A single new column defined by a SQL expression over the dataset's existing +/// columns, e.g. `name = "doubled"`, `expression = "x * 2"`. Both fields are +/// required, non-empty UTF-8; the strings are read by shared reference for the +/// duration of the call. +#[repr(C)] +pub struct LanceSqlColumn { + /// Name of the new column. Required, non-empty UTF-8. + pub name: *const c_char, + /// SQL expression evaluated against existing columns. Required, non-empty. + pub expression: *const c_char, +} + +/// Add one or more columns computed from SQL expressions over the dataset's +/// existing columns, committing a new manifest. Each fragment is scanned, the +/// expressions are evaluated over each Arrow batch, and the results are written +/// as new column files. +/// +/// - `dataset`: Open dataset (mutated; same handle remains valid afterward). +/// Must not be NULL. +/// - `columns`: Pointer to an array of `LanceSqlColumn`. Must not be NULL. +/// - `num_columns`: Length of the `columns` array. Must be non-zero. +/// - `batch_size`: Rows per scan batch while evaluating expressions. `0` uses +/// the upstream default. +/// +/// Returns 0 on success, -1 on error. Error codes: +/// `LANCE_ERR_INVALID_ARGUMENT` for NULL/empty args, NULL or empty `name` / +/// `expression`, non-UTF-8 strings, malformed SQL *syntax*, a new column whose +/// name collides with an existing column, or a `batch_size` that exceeds +/// `u32::MAX`. An expression that references a *non-existent column* +/// is resolved by the upstream planner and surfaces as `LANCE_ERR_INTERNAL` +/// (an upstream schema error, the same path as `lance_dataset_delete`); we do +/// not re-classify it at the FFI boundary. `LANCE_ERR_COMMIT_CONFLICT` for a +/// concurrent writer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_add_columns_sql( + dataset: *mut LanceDataset, + columns: *const LanceSqlColumn, + num_columns: usize, + batch_size: u64, +) -> i32 { + ffi_try!( + unsafe { add_columns_sql_inner(dataset, columns, num_columns, batch_size) }, + neg + ) +} + +unsafe fn add_columns_sql_inner( + dataset: *mut LanceDataset, + columns: *const LanceSqlColumn, + num_columns: usize, + batch_size: u64, +) -> Result { + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: location!(), + }); + } + if columns.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "columns must not be NULL".into(), + location: location!(), + }); + } + if num_columns == 0 { + return Err(lance_core::Error::InvalidInput { + source: "num_columns must be > 0".into(), + location: location!(), + }); + } + + let batch_size = resolve_batch_size(batch_size)?; + + // Materialize the (name, expression) pairs up front so a precise per-index + // error fires before the dataset's write lock is taken — matches the + // pre-lock validation pattern used by the sibling schema-evolution APIs. + let mut pairs: Vec<(String, String)> = Vec::with_capacity(num_columns); + for i in 0..num_columns { + // SAFETY: `columns` is non-NULL (checked above) and the caller + // guarantees the array has at least `num_columns` initialised entries + // valid for this call. Each entry's `name` / `expression` pointers are + // dereferenced by `parse_required_field` under the same guarantee. + let entry = unsafe { &*columns.add(i) }; + let name = unsafe { parse_required_field(entry.name, i, "name")? }; + let expression = unsafe { parse_required_field(entry.expression, i, "expression")? }; + pairs.push((name, expression)); + } + + let transform = NewColumnTransform::SqlExpressions(pairs); + + // SAFETY: `dataset` is non-NULL (checked above) and the caller guarantees + // it points to a live `LanceDataset`. `with_mut` takes an exclusive write + // lock on the inner `Arc` before yielding `&mut Dataset`, so a + // shared `&*dataset` borrow here is sound. + let ds = unsafe { &*dataset }; + ds.with_mut(|d| block_on(d.add_columns(transform, None, batch_size)))?; + Ok(0) +} + +/// Add one or more all-null columns described by an Arrow C Data Interface +/// schema, committing a new manifest. On non-legacy datasets this is a +/// metadata-only operation — no data files are rewritten. Every field in the +/// schema must be nullable (an all-null column cannot be non-nullable). +/// +/// - `dataset`: Open dataset (mutated; same handle remains valid afterward). +/// Must not be NULL. +/// - `schema`: Arrow C `ArrowSchema` describing the new columns. Read by shared +/// reference; its `release` callback is never invoked. Must not be NULL. Only +/// the top-level schema is validated before it is handed to arrow-rs; the +/// caller is responsible for providing fully-initialised child fields. +/// +/// Returns 0 on success, -1 on error. Error codes: +/// `LANCE_ERR_INVALID_ARGUMENT` for a NULL `dataset` / `schema`, an +/// uninitialised or already-released schema, a schema that is not a valid +/// Arrow schema, a non-nullable field, or a name that collides with an existing +/// column. `LANCE_ERR_NOT_SUPPORTED` for a legacy-format dataset (which cannot +/// take all-null columns as a metadata-only change). `LANCE_ERR_COMMIT_CONFLICT` +/// for a concurrent writer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_add_columns_nulls( + dataset: *mut LanceDataset, + schema: *const FFI_ArrowSchema, +) -> i32 { + ffi_try!(unsafe { add_columns_nulls_inner(dataset, schema) }, neg) +} + +unsafe fn add_columns_nulls_inner( + dataset: *mut LanceDataset, + schema: *const FFI_ArrowSchema, +) -> Result { + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: location!(), + }); + } + if schema.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "schema must not be NULL".into(), + location: location!(), + }); + } + + // SAFETY: `schema` is non-NULL (checked above) and the caller guarantees it + // points to a valid `FFI_ArrowSchema` for the duration of this call. We + // read by shared reference and never invoke its release callback. + let ffi_schema = unsafe { &*schema }; + // Reject an already-released or never-initialised schema before handing it + // to arrow-rs, which would otherwise `assert!` on the NULL `format` field + // and abort the host process under our `panic = "abort"` profile. Both + // checks are intentional — `release == NULL` is the canonical Arrow C Data + // Interface "released" sentinel, while `format == NULL` catches a + // zero-initialised or half-built struct that would slip past the release + // check. + if ffi_schema.release.is_none() || ffi_schema.format.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "schema is uninitialised or already released".into(), + location: location!(), + }); + } + // arrow-rs's `FFI_ArrowSchema::format()` does `to_str().expect(..)` on the + // format pointer; a non-NULL but non-UTF-8 top-level format would abort the + // process under `panic = "abort"`. Validate it here so a malformed format + // surfaces as INVALID_ARGUMENT instead. (Child fields are still the caller's + // responsibility — see the doc comment — as walking them would duplicate + // arrow-rs's recursive descent.) + // + // SAFETY: `format` is non-NULL (checked above) and, per the caller's CADI + // contract, points to a NUL-terminated C string valid for this call. + if unsafe { std::ffi::CStr::from_ptr(ffi_schema.format) } + .to_str() + .is_err() + { + return Err(lance_core::Error::InvalidInput { + source: "schema format string is not valid UTF-8".into(), + location: location!(), + }); + } + let arrow_schema = + ArrowSchema::try_from(ffi_schema).map_err(|e| lance_core::Error::InvalidInput { + source: format!("schema is not a valid Arrow schema: {e}").into(), + location: location!(), + })?; + + let transform = NewColumnTransform::AllNulls(Arc::new(arrow_schema)); + + // SAFETY: `dataset` is non-NULL (checked above); see `add_columns_sql_inner` + // for the `with_mut` locking justification. + let ds = unsafe { &*dataset }; + ds.with_mut(|d| block_on(d.add_columns(transform, None, None)))?; + Ok(0) +} + +/// Add columns by splicing precomputed data from an Arrow C Data Interface +/// stream into the dataset, committing a new manifest. The stream's batches are +/// consumed in order and aligned positionally to the dataset's existing rows: +/// the total row count must match the dataset exactly, or the call fails. +/// +/// - `dataset`: Open dataset (mutated; same handle remains valid afterward). +/// Must not be NULL. +/// - `stream`: Arrow C stream of the new column data. When non-NULL it is +/// consumed (released) on every return path, including error returns — the +/// caller must not use it again. (A NULL `stream` is rejected before anything +/// is consumed.) Its schema defines the new columns and must not collide with +/// existing column names. +/// - `batch_size`: Rows per write batch while aligning the stream to fragments. +/// `0` uses the upstream default. +/// +/// Returns 0 on success, -1 on error. Error codes: +/// `LANCE_ERR_INVALID_ARGUMENT` for a NULL `dataset` / `stream`, a stream +/// missing a mandatory `get_schema` / `get_next` / `release` callback, a stream +/// whose total row count does not match the dataset, a new column whose name +/// collides with an existing column, or a `batch_size` that exceeds `u32::MAX`. +/// `LANCE_ERR_COMMIT_CONFLICT` for a concurrent writer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_add_columns_stream( + dataset: *mut LanceDataset, + stream: *mut FFI_ArrowArrayStream, + batch_size: u64, +) -> i32 { + ffi_try!( + unsafe { add_columns_stream_inner(dataset, stream, batch_size) }, + neg + ) +} + +unsafe fn add_columns_stream_inner( + dataset: *mut LanceDataset, + stream: *mut FFI_ArrowArrayStream, + batch_size: u64, +) -> Result { + // The stream NULL check is the only validation that runs *before* the + // stream is consumed. Once `from_raw` succeeds, every later return path + // (dataset / batch_size) drops `reader`, which fires the FFI release + // callback — so those checks are deliberately deferred to after `from_raw`. + // Moving them ahead of `from_raw` would early-return without releasing the + // caller's stream, breaking the documented "consumed on every return" + // contract. (This NULL-before-`from_raw` ordering matches `merge_insert.rs`; + // the callback pre-flight guard below is specific to this function.) + if stream.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "stream must not be NULL".into(), + location: location!(), + }); + } + + // Reject a stream missing a mandatory C Data Interface callback *before* + // handing it to arrow-rs. `ArrowArrayStreamReader` only guards against a + // NULL `release`; a NULL `get_schema` or `get_next` would otherwise reach an + // `unwrap()` deep inside arrow-rs and abort the host process under our + // `panic = "abort"` profile. We do not require `get_last_error` (the spec + // marks it optional): requiring it would not close the abort anyway, since a + // present callback that *returns* NULL at error time hits the same + // `last_error.unwrap()` on arrow-rs's `get_next` error path — a residual + // upstream limitation reachable only by a stream that signals an error + // without a message, which we cannot guard at intake. + // + // SAFETY: `stream` is non-NULL (checked above) and the caller guarantees it + // points to an initialised, properly-aligned `FFI_ArrowArrayStream`. The + // callback slots are `Copy` function pointers; this shared borrow ends + // before any ownership transfer below. + let (release, has_schema_cb, has_next_cb) = unsafe { + let raw = &*stream; + ( + raw.release, + raw.get_schema.is_some(), + raw.get_next.is_some(), + ) + }; + if release.is_none() || !has_schema_cb || !has_next_cb { + // Preserve the "consumed on every return path" contract: release the + // stream ourselves rather than routing the broken struct through + // arrow-rs's aborting `from_raw`. + if let Some(release_fn) = release { + // SAFETY: `release_fn` is the producer's release callback for this + // stream. We null the caller's `release` field *first* — the Arrow C + // Data Interface "released" sentinel — then invoke the callback once. + // Nulling first is the move-semantics convention (the consumer claims + // ownership before cleanup) and is robust even against a producer + // that frees the struct inside its own callback: we never touch the + // struct after `release_fn` returns. `release_fn` reads `private_data` + // (untouched), so its cleanup still runs. + unsafe { + (*stream).release = None; + release_fn(stream); + } + } + return Err(lance_core::Error::InvalidInput { + source: "stream is uninitialised, already released, or missing a \ + required get_schema/get_next/release callback" + .into(), + location: location!(), + }); + } + + // SAFETY: `stream` is non-NULL (checked above) and the caller guarantees it + // points to an initialised, properly-aligned `FFI_ArrowArrayStream` they + // own. `from_raw` moves the entire caller struct into Rust (via `ptr::replace` + // with an empty, released stream), so the caller's memory cannot be released + // twice — on success or on the error path. The pre-flight guard rules out the + // `release == NULL` error, but `from_raw` still fails (a live `map_err` path) + // if the stream's `get_schema` callback returns an error code or yields a + // schema arrow-rs cannot convert; that surfaces as `LANCE_ERR_INVALID_ARGUMENT`. + let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| { + lance_core::Error::InvalidInput { + source: e.to_string().into(), + location: location!(), + } + })?; + + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: location!(), + }); + } + + let batch_size = resolve_batch_size(batch_size)?; + + let transform = NewColumnTransform::Reader(Box::new(reader)); + + // SAFETY: `dataset` is non-NULL (checked above); see `add_columns_sql_inner` + // for the `with_mut` locking justification. + let ds = unsafe { &*dataset }; + ds.with_mut(|d| block_on(d.add_columns(transform, None, batch_size)))?; + Ok(0) +} + +/// Parse a required, non-empty C string field of a `LanceSqlColumn`, attaching +/// the column index and field name to any error so the caller can pinpoint the +/// offending entry. +unsafe fn parse_required_field(ptr: *const c_char, index: usize, field: &str) -> Result { + // SAFETY: `ptr` is either NULL (rejected below) or a NUL-terminated C + // string the caller keeps alive for this call. + let value = unsafe { helpers::parse_c_string(ptr)? } + .filter(|s| !s.is_empty()) + .ok_or_else(|| lance_core::Error::InvalidInput { + source: format!("columns[{index}].{field} must not be NULL or empty").into(), + location: location!(), + })?; + Ok(value.to_string()) +} + +/// Translate the `0 = upstream default` batch-size sentinel into `Option`, +/// rejecting values that do not fit `u32` rather than silently wrapping with an +/// `as` cast. +fn resolve_batch_size(batch_size: u64) -> Result> { + if batch_size == 0 { + return Ok(None); + } + let narrowed = u32::try_from(batch_size).map_err(|_| lance_core::Error::InvalidInput { + source: format!("batch_size={batch_size} exceeds u32::MAX").into(), + location: location!(), + })?; + Ok(Some(narrowed)) +} diff --git a/src/lib.rs b/src/lib.rs index 9a81bc9..1fafd4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ //! - The caller is responsible for freeing returned strings with `lance_free_string()`. #![allow(clippy::missing_safety_doc)] +mod add_columns; mod alter_columns; mod async_dispatcher; mod batch; @@ -35,6 +36,7 @@ mod versions; mod writer; // Re-export all extern "C" symbols so they appear in the cdylib. +pub use add_columns::*; pub use alter_columns::*; pub use batch::*; pub use compact::*; diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index d7b7820..92ee5f5 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -15,7 +15,7 @@ use arrow::ffi::from_ffi; use arrow::ffi_stream::ArrowArrayStreamReader; use arrow::ffi_stream::FFI_ArrowArrayStream; use arrow::record_batch::RecordBatchReader; -use arrow_array::{Float32Array, Int32Array, RecordBatch, StringArray}; +use arrow_array::{Array, Float32Array, Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use lance_c::*; @@ -3312,33 +3312,49 @@ fn test_dataset_write_null_args_return_error() { /// Wrapping this in an `FFI_ArrowArrayStream` lets a test observe whether the /// stream's `release` callback was invoked: dropping the boxed reader (via /// `release` on the FFI side) fires `Drop` and increments the counter. -struct CountingReader { - inner: arrow::record_batch::RecordBatchIterator< - std::vec::IntoIter>, - >, +struct CountingReader { + inner: R, drop_count: Arc, } -impl Drop for CountingReader { +impl Drop for CountingReader { fn drop(&mut self) { self.drop_count .fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } -impl Iterator for CountingReader { +impl Iterator for CountingReader { type Item = arrow::error::Result; fn next(&mut self) -> Option { self.inner.next() } } -impl RecordBatchReader for CountingReader { +impl RecordBatchReader for CountingReader { fn schema(&self) -> Arc { self.inner.schema() } } +/// Like `new_column_stream`, but the reader's `Drop` increments a counter so a +/// test can prove the stream is consumed (released) on a given path. The single +/// `name` column avoids colliding with the fixtures' existing columns. +fn make_counted_column_stream( + name: &str, + values: Vec, +) -> (FFI_ArrowArrayStream, Arc) { + let drop_count = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, true)])); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap(); + let reader = CountingReader { + inner: arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema), + drop_count: drop_count.clone(), + }; + (FFI_ArrowArrayStream::new(Box::new(reader)), drop_count) +} + /// Build a `(stream, drop_counter)` pair where the stream wraps a single-batch /// reader whose `Drop` increments the counter. After a call that consumes the /// stream, the counter goes from 0 → 1. @@ -7090,3 +7106,840 @@ fn test_alter_columns_tighten_nullability_with_nulls_rejected() { unsafe { lance_dataset_close(ds) }; } + +// --------------------------------------------------------------------------- +// lance_dataset_add_columns_{sql,nulls,stream} tests +// --------------------------------------------------------------------------- + +/// Build a `LanceSqlColumn` from two live `CString`s. The caller must keep the +/// `CString`s alive for as long as the returned struct is used. +fn sql_column(name: &CString, expression: &CString) -> LanceSqlColumn { + LanceSqlColumn { + name: name.as_ptr(), + expression: expression.as_ptr(), + } +} + +/// Scan the dataset and build a `key -> value` map, casting both columns to +/// i64 so comparisons are independent of the exact arithmetic result type and +/// robust to any fragment/row scan ordering. A NULL value maps to `None`. +fn collect_i64_pairs( + ds: *const LanceDataset, + key: &str, + value: &str, +) -> std::collections::HashMap> { + let batches = scan_all_rows(ds); + let mut map = std::collections::HashMap::new(); + for batch in &batches { + let key_col = + arrow::compute::cast(batch.column_by_name(key).unwrap(), &DataType::Int64).unwrap(); + let key_arr = key_col + .as_any() + .downcast_ref::() + .unwrap(); + let val_col = + arrow::compute::cast(batch.column_by_name(value).unwrap(), &DataType::Int64).unwrap(); + let val_arr = val_col + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + let v = if val_arr.is_null(i) { + None + } else { + Some(val_arr.value(i)) + }; + map.insert(key_arr.value(i), v); + } + } + map +} + +// ── SQL variant ──────────────────────────────────────────────────────────── + +#[test] +fn test_add_columns_sql_single() { + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("id_x2"); + let expr = c_str("id * 2"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, 0); + + // New column appears; row count is unchanged. + let names = schema_field_names(ds); + assert_eq!(names, vec!["id", "value", "label", "id_x2"]); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 5); + + // Values are computed from the existing `id` column. + let pairs = collect_i64_pairs(ds, "id", "id_x2"); + for k in 0..5i64 { + assert_eq!(pairs.get(&k), Some(&Some(k * 2))); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_multiple_per_call() { + let (_tmp, uri) = create_large_dataset(4); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let n1 = c_str("id_plus"); + let e1 = c_str("id + 10"); + let n2 = c_str("id_const"); + let e2 = c_str("100"); + let cols = [sql_column(&n1, &e1), sql_column(&n2, &e2)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, 0); + + let names = schema_field_names(ds); + assert!(names.contains(&"id_plus".to_string())); + assert!(names.contains(&"id_const".to_string())); + + let plus = collect_i64_pairs(ds, "id", "id_plus"); + let konst = collect_i64_pairs(ds, "id", "id_const"); + for k in 0..4i64 { + assert_eq!(plus.get(&k), Some(&Some(k + 10))); + assert_eq!(konst.get(&k), Some(&Some(100))); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_bumps_version() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let v_before = unsafe { lance_dataset_version(ds) }; + let name = c_str("c"); + let expr = c_str("id + 1"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, 0); + let v_after = unsafe { lance_dataset_version(ds) }; + assert!(v_after > v_before, "version should increase"); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_honors_batch_size() { + // A small explicit batch size must still produce correct results across + // the whole dataset (the scan is chunked, the output is not). + let (_tmp, uri) = create_large_dataset(7); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("id_x2"); + let expr = c_str("id * 2"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 2) }; + assert_eq!(rc, 0); + + let pairs = collect_i64_pairs(ds, "id", "id_x2"); + for k in 0..7i64 { + assert_eq!(pairs.get(&k), Some(&Some(k * 2))); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_null_dataset_rejected() { + let name = c_str("c"); + let expr = c_str("id + 1"); + let cols = [sql_column(&name, &expr)]; + let rc = + unsafe { lance_dataset_add_columns_sql(ptr::null_mut(), cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} + +#[test] +fn test_add_columns_sql_null_columns_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let rc = unsafe { lance_dataset_add_columns_sql(ds, ptr::null(), 1, 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_zero_count_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let expr = c_str("id + 1"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), 0, 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_null_name_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let expr = c_str("id + 1"); + let cols = [LanceSqlColumn { + name: ptr::null(), + expression: expr.as_ptr(), + }]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_empty_name_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str(""); + let expr = c_str("id + 1"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_null_expression_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let cols = [LanceSqlColumn { + name: name.as_ptr(), + expression: ptr::null(), + }]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_empty_expression_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let expr = c_str(""); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_non_utf8_name_rejected() { + // A non-UTF-8 `name` must surface as INVALID_ARGUMENT (parse_c_string maps + // the Utf8Error to InvalidInput), not panic. `CString` holds arbitrary + // non-NUL bytes, so it carries the invalid UTF-8 across the FFI boundary. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let bad_name = CString::new([0xFFu8, 0xFE]).unwrap(); + let expr = c_str("id * 2"); + let cols = [LanceSqlColumn { + name: bad_name.as_ptr(), + expression: expr.as_ptr(), + }]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_non_utf8_expression_rejected() { + // Symmetric with the name case: a non-UTF-8 `expression` goes through the + // same `parse_required_field` path and must surface as INVALID_ARGUMENT. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let bad_expr = CString::new([0xFFu8, 0xFE]).unwrap(); + let cols = [LanceSqlColumn { + name: name.as_ptr(), + expression: bad_expr.as_ptr(), + }]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_malformed_expr_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let expr = c_str("id +* 2"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_unknown_column_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let expr = c_str("does_not_exist + 1"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + // A non-existent column is resolved by the lance-datafusion planner, which + // raises a schema error (`Error::Schema`) — the same upstream path as + // `lance_dataset_delete`'s unknown-column predicate. We don't re-classify + // it at the FFI boundary, so it surfaces as `Internal`, distinct from a + // *syntax* error, which the planner wraps as `InvalidInput`. If upstream + // ever tightens this to InvalidInput, tighten this assertion too. + assert_eq!(lance_last_error_code(), LanceErrorCode::Internal); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_name_collision_rejected() { + // A new column whose name matches an existing column is rejected. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("id"); + let expr = c_str("id + 1"); + let cols = [sql_column(&name, &expr)]; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_sql_batch_size_overflow_rejected() { + // A batch_size beyond u32::MAX must be rejected rather than silently + // wrapped. (Only exercisable where u64 > u32::MAX, i.e. 64-bit hosts.) + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let name = c_str("c"); + let expr = c_str("id + 1"); + let cols = [sql_column(&name, &expr)]; + let too_big = u32::MAX as u64 + 1; + let rc = unsafe { lance_dataset_add_columns_sql(ds, cols.as_ptr(), cols.len(), too_big) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +// ── AllNulls variant ─────────────────────────────────────────────────────── + +#[test] +fn test_add_columns_nulls_single() { + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let new_schema = Schema::new(vec![Field::new("extra", DataType::Int64, true)]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, 0); + + let names = schema_field_names(ds); + assert_eq!(names, vec!["id", "value", "label", "extra"]); + // Row count is unchanged — this is a metadata-only add. + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 5); + + // Every row in the new column is NULL. + let batches = scan_all_rows(ds); + let total_nulls: usize = batches + .iter() + .map(|b| b.column_by_name("extra").unwrap().null_count()) + .sum(); + assert_eq!(total_nulls, 5); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_multiple_fields() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let new_schema = Schema::new(vec![ + Field::new("extra_int", DataType::Int64, true), + Field::new("extra_str", DataType::Utf8, true), + ]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, 0); + + let names = schema_field_names(ds); + assert!(names.contains(&"extra_int".to_string())); + assert!(names.contains(&"extra_str".to_string())); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_bumps_version() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let v_before = unsafe { lance_dataset_version(ds) }; + let new_schema = Schema::new(vec![Field::new("extra", DataType::Int64, true)]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, 0); + let v_after = unsafe { lance_dataset_version(ds) }; + assert!(v_after > v_before, "version should increase"); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_null_dataset_rejected() { + let new_schema = Schema::new(vec![Field::new("extra", DataType::Int64, true)]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ptr::null_mut(), &ffi as *const _) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} + +#[test] +fn test_add_columns_nulls_null_schema_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let rc = unsafe { lance_dataset_add_columns_nulls(ds, ptr::null()) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_released_schema_rejected() { + // An uninitialised / already-released `FFI_ArrowSchema` has both its + // `release` callback and `format` field NULL. It must surface as + // INVALID_ARGUMENT rather than aborting via the arrow-rs assert. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let empty_schema = FFI_ArrowSchema::empty(); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &empty_schema as *const _) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_non_utf8_format_rejected() { + // A non-NULL but non-UTF-8 top-level `format` must be rejected at the FFI + // boundary rather than aborting via arrow-rs's `format().to_str().expect()` + // under `panic = "abort"`. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // Hand-build a minimal `FFI_ArrowSchema` that owns no arrow-managed memory: + // an empty struct with a no-op `release` we install, and `format` pointed at + // non-UTF-8 bytes we own. This avoids overwriting an arrow-allocated + // `format` pointer (whose producer release would then double-free against + // our `CString` and corrupt the heap). + unsafe extern "C" fn noop_release(_: *mut FFI_ArrowSchema) {} + let bad_format = CString::new([0xFFu8, 0xFE]).unwrap(); + let mut ffi = FFI_ArrowSchema::empty(); + ffi.format = bad_format.as_ptr(); + ffi.release = Some(noop_release); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_non_nullable_field_rejected() { + // An all-null column cannot be non-nullable — upstream rejects it. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let new_schema = Schema::new(vec![Field::new("extra", DataType::Int64, false)]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_name_collision_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // `value` already exists in the fixture. + let new_schema = Schema::new(vec![Field::new("value", DataType::Int64, true)]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_nulls_legacy_dataset_not_supported() { + // Adding all-null columns is metadata-only on the modern format, but the + // legacy (0.1) file format can't represent missing columns that way, so + // upstream returns NotSupported → LANCE_ERR_NOT_SUPPORTED. This is the one + // documented error code the other tests don't reach, so write a legacy + // dataset explicitly to exercise it. + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("legacy_ds").to_str().unwrap().to_string(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + lance_c::runtime::block_on(async { + let params = lance::dataset::WriteParams { + data_storage_version: Some(lance_file::version::LanceFileVersion::Legacy), + ..Default::default() + }; + Dataset::write( + arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema), + &uri, + Some(params), + ) + .await + .unwrap(); + }); + + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let new_schema = Schema::new(vec![Field::new("extra", DataType::Int64, true)]); + let ffi = schema_to_ffi(&new_schema); + let rc = unsafe { lance_dataset_add_columns_nulls(ds, &ffi as *const _) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::NotSupported); + + unsafe { lance_dataset_close(ds) }; +} + +// ── Stream variant ───────────────────────────────────────────────────────── + +/// Build an `FFI_ArrowArrayStream` carrying a single Int32 column named `name` +/// with the given values — the precomputed data for a new column. +fn new_column_stream(name: &str, values: Vec) -> FFI_ArrowArrayStream { + let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, true)])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))]).unwrap(); + batch_to_ffi_stream(batch) +} + +#[test] +fn test_add_columns_stream_single() { + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // Storage order matches id order (single fragment written 0..5). + let mut stream = new_column_stream("extra", vec![1000, 1001, 1002, 1003, 1004]); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, 0); + + let names = schema_field_names(ds); + assert_eq!(names, vec!["id", "value", "label", "extra"]); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 5); + + let pairs = collect_i64_pairs(ds, "id", "extra"); + for k in 0..5i64 { + assert_eq!(pairs.get(&k), Some(&Some(1000 + k))); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_multi_fragment() { + // The stream is sliced across fragment boundaries (5 + 5 rows). + let (_tmp, uri) = create_multi_fragment_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let values: Vec = (0..10).map(|i| 1000 + i).collect(); + let mut stream = new_column_stream("extra", values); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, 0); + + let pairs = collect_i64_pairs(ds, "id", "extra"); + for k in 0..10i64 { + assert_eq!(pairs.get(&k), Some(&Some(1000 + k))); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_honors_batch_size() { + let (_tmp, uri) = create_multi_fragment_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let values: Vec = (0..10).map(|i| 2000 + i).collect(); + let mut stream = new_column_stream("extra", values); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 3) }; + assert_eq!(rc, 0); + + let pairs = collect_i64_pairs(ds, "id", "extra"); + for k in 0..10i64 { + assert_eq!(pairs.get(&k), Some(&Some(2000 + k))); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_bumps_version() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let v_before = unsafe { lance_dataset_version(ds) }; + let mut stream = new_column_stream("extra", vec![7, 8, 9]); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, 0); + let v_after = unsafe { lance_dataset_version(ds) }; + assert!(v_after > v_before, "version should increase"); + + unsafe { lance_dataset_close(ds) }; +} + +// (The NULL-dataset path is covered by `test_add_columns_stream_null_dataset_consumes_stream` +// below, which also proves the stream is consumed on that error path.) + +#[test] +fn test_add_columns_stream_null_stream_rejected() { + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let rc = unsafe { lance_dataset_add_columns_stream(ds, ptr::null_mut(), 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_row_count_mismatch_rejected() { + // The stream supplies fewer rows than the dataset has — upstream rejects + // the misaligned splice. + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // 3 stream rows vs 5 dataset rows. The error fires *inside* `add_columns` + // (a different drop point than the early-return paths), so use the counted + // stream to also prove the reader is released there. + let (mut stream, drop_count) = make_counted_column_stream("extra", vec![1, 2, 3]); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, -1); + // Upstream `add_columns_from_stream` raises this via `Error::invalid_input` + // ("Stream ended before producing values for all rows"), so it maps to + // InvalidArgument — unlike the SQL unknown-column path, which is a schema + // error (Internal). If upstream re-classifies it, update this assertion. + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_name_collision_rejected() { + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + // A stream column named `id` collides with the existing column. + let mut stream = new_column_stream("id", vec![1, 2, 3, 4, 5]); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_batch_size_overflow_rejected() { + // Mirrors the SQL overflow test: a batch_size beyond u32::MAX is rejected + // rather than silently wrapped. `from_raw` runs before the batch_size check, + // so the stream must still be consumed — proven via the drop counter (a bare + // `release.is_none()` check would be vacuous, since `from_raw` clears that + // slot unconditionally). + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let (mut stream, drop_count) = make_counted_stream(&write_schema()); + let too_big = u32::MAX as u64 + 1; + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, too_big) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_missing_callback_rejected() { + // A stream missing a mandatory CADI callback must be rejected at the FFI + // boundary rather than aborting the process via an `unwrap()` deep inside + // arrow-rs (which only guards against a NULL `release`). Cover both the + // `get_schema` (construction) and `get_next` (iteration) callbacks, since + // they abort on different arrow-rs code paths. The drop counter proves our + // manual `release_fn(stream)` actually frees the reader on this path (which + // does not go through `from_raw`). + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + for sabotage in ["get_schema", "get_next"] { + let (mut stream, drop_count) = make_counted_stream(&write_schema()); + match sabotage { + "get_schema" => stream.get_schema = None, + "get_next" => stream.get_next = None, + other => unreachable!("unknown sabotage target: {other}"), + } + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, -1, "{sabotage}=None must be rejected"); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + // We also null the caller's `release` slot so a non-compliant producer + // cannot trigger a second release. + assert!( + stream.release.is_none(), + "{sabotage}: release slot must be cleared" + ); + } + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_already_released_rejected() { + // A stream with `release == None` is the CADI "already released" sentinel + // (the first conjunct of the callback guard). It must be rejected, and our + // handler must NOT invoke any release callback. `FFI_ArrowArrayStream::empty()` + // owns no resources, so this path leaks nothing. + let (_tmp, uri) = create_large_dataset(2); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let mut stream = FFI_ArrowArrayStream::empty(); + let rc = unsafe { lance_dataset_add_columns_stream(ds, &mut stream, 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_add_columns_stream_null_dataset_consumes_stream() { + // The dataset-NULL check runs *after* `from_raw`, so the stream is consumed + // (released) even on that error path — proven via the drop counter. + let (mut stream, drop_count) = make_counted_stream(&write_schema()); + let rc = unsafe { lance_dataset_add_columns_stream(ptr::null_mut(), &mut stream, 0) }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); +} diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index 3c56b3e..7ea6631 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -496,6 +496,124 @@ static void test_drop_columns(const char *write_uri) { printf("OK\n"); } +/* Re-opens the dataset (reduced to `id` only by `test_drop_columns`) and + * exercises the three `lance_dataset_add_columns_*` entry points. The positive + * path uses the SQL variant — strings only, no hand-built Arrow C structures; + * the nulls/stream variants are smoke-checked through their NULL-argument + * rejections, since their happy paths are covered by the Rust integration + * tests. The added `id_doubled` column is harmless to the subsequent + * compact/delete steps. Must run after `test_drop_columns`. */ +static void test_add_columns(const char *write_uri) { + printf(" test_add_columns... "); + + LanceDataset *ds = lance_dataset_open(write_uri, NULL, 0); + ASSERT(ds != NULL, "open failed"); + uint64_t v_before = lance_dataset_version(ds); + + /* Snapshot field count before the add so we can confirm it grew by one. */ + struct ArrowSchema schema_before; + memset(&schema_before, 0, sizeof(schema_before)); + int32_t rc = lance_dataset_schema(ds, &schema_before); + ASSERT(rc == 0, "schema export failed"); + int64_t fields_before = schema_before.n_children; + if (schema_before.release) schema_before.release(&schema_before); + + /* SQL variant: derive `id_doubled = id * 2` from the surviving `id`. */ + LanceSqlColumn col = {0}; + col.name = "id_doubled"; + col.expression = "id * 2"; + rc = lance_dataset_add_columns_sql(ds, &col, 1, 0); + ASSERT(rc == 0, "add_columns_sql failed"); + ASSERT(lance_dataset_version(ds) > v_before, + "add_columns_sql must bump the version"); + + struct ArrowSchema schema_after; + memset(&schema_after, 0, sizeof(schema_after)); + rc = lance_dataset_schema(ds, &schema_after); + ASSERT(rc == 0, "schema export failed after add"); + int64_t fields_after = schema_after.n_children; + if (schema_after.release) schema_after.release(&schema_after); + ASSERT(fields_after == fields_before + 1, + "schema field count must increase by 1 after add"); + + /* SQL rejections: NULL dataset, NULL columns, zero count, NULL name. */ + rc = lance_dataset_add_columns_sql(NULL, &col, 1, 0); + ASSERT(rc == -1, "NULL dataset must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + rc = lance_dataset_add_columns_sql(ds, NULL, 1, 0); + ASSERT(rc == -1, "NULL columns must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + rc = lance_dataset_add_columns_sql(ds, &col, 0, 0); + ASSERT(rc == -1, "num_columns=0 must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + LanceSqlColumn bad_name = {0}; + bad_name.name = NULL; + bad_name.expression = "id * 2"; + rc = lance_dataset_add_columns_sql(ds, &bad_name, 1, 0); + ASSERT(rc == -1, "NULL name must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + LanceSqlColumn empty_name = {0}; + empty_name.name = ""; + empty_name.expression = "id * 2"; + rc = lance_dataset_add_columns_sql(ds, &empty_name, 1, 0); + ASSERT(rc == -1, "empty name must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + LanceSqlColumn null_expr = {0}; + null_expr.name = "x"; + null_expr.expression = NULL; + rc = lance_dataset_add_columns_sql(ds, &null_expr, 1, 0); + ASSERT(rc == -1, "NULL expression must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + LanceSqlColumn empty_expr = {0}; + empty_expr.name = "x"; + empty_expr.expression = ""; + rc = lance_dataset_add_columns_sql(ds, &empty_expr, 1, 0); + ASSERT(rc == -1, "empty expression must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + /* AllNulls variant rejections: NULL dataset, NULL schema. */ + rc = lance_dataset_add_columns_nulls(NULL, NULL); + ASSERT(rc == -1, "NULL dataset must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + rc = lance_dataset_add_columns_nulls(ds, NULL); + ASSERT(rc == -1, "NULL schema must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + /* Stream variant rejections. The stream-NULL check fires first, so passing + * NULL for both arguments also surfaces INVALID_ARGUMENT. (The + * valid-stream + NULL-dataset path runs after the stream is consumed and + * cannot be smoke-tested in pure C without a live stream struct; it is + * covered by the Rust integration tests.) */ + rc = lance_dataset_add_columns_stream(NULL, NULL, 0); + ASSERT(rc == -1, "NULL dataset and NULL stream must fail (stream check first)"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + rc = lance_dataset_add_columns_stream(ds, NULL, 0); + ASSERT(rc == -1, "NULL stream must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + lance_dataset_close(ds); + printf("OK\n"); +} + /* Re-opens the dataset just written by `test_dataset_write_roundtrip` and * exercises `lance_dataset_compact_files`. The smoke fixture is a single * fragment, so the default planner has nothing to compact — we expect @@ -576,6 +694,7 @@ int main(int argc, char **argv) { test_merge_insert(write_uri); test_alter_columns(write_uri); test_drop_columns(write_uri); + test_add_columns(write_uri); test_compact_files(write_uri); test_delete(write_uri); diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index d531820..df5bcb2 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -522,6 +522,97 @@ static void test_drop_columns(const std::string& dst_uri) { PASS(); } +// Re-opens the dataset (reduced to `id` only by `test_drop_columns`) and +// exercises the three `Dataset::add_columns_*` wrappers. The positive path +// uses the SQL variant (strings only); the nulls/stream variants are +// smoke-checked through their argument rejections, since their happy paths are +// covered by the Rust integration tests. The added `id_doubled` column is +// harmless to the subsequent compact/delete steps. Must run after +// `test_drop_columns`. +static void test_add_columns(const std::string& dst_uri) { + TEST(test_add_columns); + + auto ds = lance::Dataset::open(dst_uri); + uint64_t v_before = ds.version(); + + // Snapshot the field count before the add. (If ds.schema() threw, the + // zero-initialised struct's release stays null, so there is no leak; here + // the handle is freshly opened and valid, so the export is expected to + // succeed.) + ArrowSchema schema_before; + memset(&schema_before, 0, sizeof(schema_before)); + ds.schema(&schema_before); + int64_t fields_before = schema_before.n_children; + if (schema_before.release) schema_before.release(&schema_before); + + // SQL variant: derive `id_doubled = id * 2` from the surviving `id`. + ds.add_columns_sql({{"id_doubled", "id * 2"}}); + assert(ds.version() > v_before + && "add_columns_sql must bump the version"); + + ArrowSchema schema_after; + memset(&schema_after, 0, sizeof(schema_after)); + ds.schema(&schema_after); + int64_t fields_after = schema_after.n_children; + if (schema_after.release) schema_after.release(&schema_after); + assert(fields_after == fields_before + 1 + && "schema field count must increase by 1 after add"); + + // Empty SQL column list must throw with INVALID_ARGUMENT. + bool caught_empty = false; + try { + ds.add_columns_sql({}); + } catch (const lance::Error& e) { + caught_empty = true; + assert(e.code == LANCE_ERR_INVALID_ARGUMENT); + } + assert(caught_empty); + + // An empty column name must throw with INVALID_ARGUMENT. + bool caught_empty_name = false; + try { + ds.add_columns_sql({{"", "id * 2"}}); + } catch (const lance::Error& e) { + caught_empty_name = true; + assert(e.code == LANCE_ERR_INVALID_ARGUMENT); + } + assert(caught_empty_name); + + // An empty expression must throw with INVALID_ARGUMENT. (A NULL expression + // is not representable here — `SqlColumn::expression` is a std::string — so + // the NULL-pointer case is covered by the C test instead.) + bool caught_empty_expr = false; + try { + ds.add_columns_sql({{"x", ""}}); + } catch (const lance::Error& e) { + caught_empty_expr = true; + assert(e.code == LANCE_ERR_INVALID_ARGUMENT); + } + assert(caught_empty_expr); + + // AllNulls with a NULL schema pointer must throw with INVALID_ARGUMENT. + bool caught_null_schema = false; + try { + ds.add_columns_nulls(nullptr); + } catch (const lance::Error& e) { + caught_null_schema = true; + assert(e.code == LANCE_ERR_INVALID_ARGUMENT); + } + assert(caught_null_schema); + + // Stream with a NULL stream pointer must throw with INVALID_ARGUMENT. + bool caught_null_stream = false; + try { + ds.add_columns_stream(nullptr); + } catch (const lance::Error& e) { + caught_null_stream = true; + assert(e.code == LANCE_ERR_INVALID_ARGUMENT); + } + assert(caught_null_stream); + + PASS(); +} + // Re-opens the dataset just written by `test_dataset_write_roundtrip` and // exercises `Dataset::compact_files`. The smoke fixture is a single fragment // so the default planner has nothing to compact — we expect a no-op (zero @@ -595,6 +686,7 @@ int main(int argc, char** argv) { test_merge_insert(write_uri); test_alter_columns(write_uri); test_drop_columns(write_uri); + test_add_columns(write_uri); test_compact_files(write_uri); test_delete_rows(write_uri);