Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 99 additions & 17 deletions crates/lance-graph/src/csr_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,33 +236,45 @@ impl CsrIndexBuilder {
self
}

/// Add edges from an Arrow RecordBatch with `src_id` and `dst_id` columns.
pub fn add_edges_from_batch(mut self, batch: &RecordBatch) -> Result<Self> {
let src_col = batch
.column_by_name("src_id")
.ok_or_else(|| GraphError::PlanError {
message: "Edge batch missing 'src_id' column".to_string(),
location: snafu::Location::new(file!(), line!(), column!()),
})?;
let dst_col = batch
.column_by_name("dst_id")
/// Add edges from an Arrow RecordBatch. Expects columns named `src_id` and
/// `dst_id`; use [`add_edges_from_batch_with_columns`] for other names.
pub fn add_edges_from_batch(self, batch: &RecordBatch) -> Result<Self> {
self.add_edges_from_batch_with_columns(batch, "src_id", "dst_id")
}

/// Add edges from an Arrow RecordBatch, reading source vertex ids from
/// `src_col` and destination vertex ids from `dst_col`.
///
/// Both columns must be `UInt64`. To build a reversed (incoming/CSC) index,
/// pass the destination column name as `src_col` and vice versa.
pub fn add_edges_from_batch_with_columns(
mut self,
batch: &RecordBatch,
src_col: &str,
dst_col: &str,
) -> Result<Self> {
let src_array = batch
.column_by_name(src_col)
.ok_or_else(|| GraphError::PlanError {
message: "Edge batch missing 'dst_id' column".to_string(),
message: format!("Edge batch missing '{}' column", src_col),
location: snafu::Location::new(file!(), line!(), column!()),
})?;

let src_array = src_col
})?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| GraphError::PlanError {
message: "src_id column must be UInt64".to_string(),
message: format!("'{}' column must be UInt64", src_col),
location: snafu::Location::new(file!(), line!(), column!()),
})?;
let dst_array = dst_col
let dst_array = batch
.column_by_name(dst_col)
.ok_or_else(|| GraphError::PlanError {
message: format!("Edge batch missing '{}' column", dst_col),
location: snafu::Location::new(file!(), line!(), column!()),
})?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| GraphError::PlanError {
message: "dst_id column must be UInt64".to_string(),
message: format!("'{}' column must be UInt64", dst_col),
location: snafu::Location::new(file!(), line!(), column!()),
})?;

Expand Down Expand Up @@ -651,4 +663,74 @@ mod tests {
assert_eq!(idx.neighbors(0), &[1, 1, 1]);
assert_eq!(idx.degree(0), 3);
}

#[test]
fn test_build_from_record_batch_custom_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new("src_person_id", DataType::UInt64, false),
Field::new("dst_person_id", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(UInt64Array::from(vec![0, 0, 1])),
Arc::new(UInt64Array::from(vec![1, 2, 2])),
],
)
.unwrap();

// Forward (outgoing): src -> dst
let idx = CsrIndexBuilder::new()
.add_edges_from_batch_with_columns(&batch, "src_person_id", "dst_person_id")
.unwrap()
.build();
assert_eq!(idx.neighbors(0), &[1, 2]);
assert_eq!(idx.neighbors(1), &[2]);

// Reversed (incoming): swap the column args -> dst -> src
let rev = CsrIndexBuilder::new()
.add_edges_from_batch_with_columns(&batch, "dst_person_id", "src_person_id")
.unwrap()
.build();
assert_eq!(rev.neighbors(2), &[0, 1]);
assert_eq!(rev.neighbors(1), &[0]);
assert_eq!(rev.neighbors(0), &[] as &[u64]);
}

#[test]
fn test_add_edges_from_batch_with_columns_errors() {
// Missing column name -> error
let schema = Arc::new(Schema::new(vec![
Field::new("src_person_id", DataType::UInt64, false),
Field::new("dst_person_id", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(UInt64Array::from(vec![0u64])),
Arc::new(UInt64Array::from(vec![1u64])),
],
)
.unwrap();
assert!(CsrIndexBuilder::new()
.add_edges_from_batch_with_columns(&batch, "missing", "dst_person_id")
.is_err());

// Wrong column type (Int64 instead of UInt64) -> error
let schema2 = Arc::new(Schema::new(vec![
Field::new("src_person_id", DataType::Int64, false),
Field::new("dst_person_id", DataType::UInt64, false),
]));
let batch2 = RecordBatch::try_new(
schema2,
vec![
Arc::new(arrow_array::Int64Array::from(vec![0i64])),
Arc::new(UInt64Array::from(vec![1u64])),
],
)
.unwrap();
assert!(CsrIndexBuilder::new()
.add_edges_from_batch_with_columns(&batch2, "src_person_id", "dst_person_id")
.is_err());
}
}
9 changes: 7 additions & 2 deletions crates/lance-graph/src/datafusion_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
pub mod analysis;
mod builder;
mod config_helpers;
mod expression;
pub(crate) mod expression;
mod join_ops;
mod scan_ops;
mod udf;
pub mod vector_ops;

#[cfg(test)]
mod test_fixtures;
pub(crate) mod test_fixtures;

// Re-export public types
pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
Expand Down Expand Up @@ -62,6 +62,11 @@ impl DataFusionPlanner {
}
}

/// Access the catalog, if any (used by the native planner).
pub(crate) fn catalog_ref(&self) -> Option<&Arc<dyn GraphSourceCatalog>> {
self.catalog.as_ref()
}

/// Helper to convert DataFusion builder errors into GraphError::PlanError with context
pub(crate) fn plan_error<E: std::fmt::Display>(
&self,
Expand Down
77 changes: 0 additions & 77 deletions crates/lance-graph/src/lance_native_planner.rs

This file was deleted.

Loading
Loading