From d263482af3045b50ebca04b991b242ba615ffb4f Mon Sep 17 00:00:00 2001 From: beinan Date: Tue, 20 Jan 2026 03:43:07 +0000 Subject: [PATCH] feat(namespace): allow executing Cypher against namespaces --- python/Cargo.lock | 4 + python/Cargo.toml | 3 + python/python/tests/test_graph.py | 31 ++++++ python/src/graph.rs | 41 +++++--- python/src/lib.rs | 1 + python/src/namespace.rs | 161 ++++++++++++++++++++++++++++++ rust/lance-graph/Cargo.toml | 1 + rust/lance-graph/src/query.rs | 137 +++++++++++++++++++++++++ 8 files changed, 366 insertions(+), 13 deletions(-) create mode 100644 python/src/namespace.rs diff --git a/python/Cargo.lock b/python/Cargo.lock index f44716cf..a4a459d1 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3443,6 +3443,7 @@ dependencies = [ "futures", "lance", "lance-linalg", + "lance-namespace", "nom 7.1.3", "serde", "serde_json", @@ -3457,11 +3458,14 @@ dependencies = [ "arrow-array", "arrow-ipc", "arrow-schema", + "async-trait", "futures", "lance-graph", + "lance-namespace", "pyo3", "serde", "serde_json", + "snafu", "tokio", ] diff --git a/python/Cargo.toml b/python/Cargo.toml index 30b96d1c..76c4f7f3 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -16,7 +16,10 @@ arrow-schema = "56.2" arrow-ipc = "56.2" futures = "0.3" lance-graph = { path = "../rust/lance-graph" } +lance-namespace = "1.0.1" serde = { version = "1", features = ["derive"] } serde_json = "1" +async-trait = "0.1" pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39", "py-clone"] } +snafu = "0.8" tokio = { version = "1.37", features = ["rt-multi-thread", "macros"] } diff --git a/python/python/tests/test_graph.py b/python/python/tests/test_graph.py index c699d338..93014a8c 100644 --- a/python/python/tests/test_graph.py +++ b/python/python/tests/test_graph.py @@ -1,6 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors +from pathlib import Path + import pyarrow as pa import pytest from lance_graph import CypherQuery, GraphConfig @@ -194,3 +196,32 @@ def test_distinct_clause(graph_env): assert len(data["c.company_name"]) == 3 assert set(data["c.company_name"]) == {"TechCorp", "DataInc", "CloudSoft"} + + +def test_execute_with_directory_namespace(graph_env, tmp_path, monkeypatch): + config, datasets, _ = graph_env + + repo_root = Path(__file__).resolve().parents[3] + monkeypatch.syspath_prepend(str(repo_root / "lance" / "python" / "python")) + monkeypatch.syspath_prepend(str(repo_root / "lance-namespace" / "python")) + + try: + from lance import write_dataset + from lance_namespace import connect + except ImportError: + pytest.skip("Lance namespace dependencies not available") + + for name, table in datasets.items(): + write_dataset(table, tmp_path / f"{name}.lance") + + catalog = connect("dir", {"root": str(tmp_path)}) + + query = ( + CypherQuery("MATCH (p:Person) WHERE p.age > 30 RETURN p.name") + .with_config(config) + ) + + result = query.execute(catalog) + data = result.to_pydict() + + assert set(data["p.name"]) == {"Bob", "David"} diff --git a/python/src/graph.rs b/python/src/graph.rs index 5fb76cb4..e8615539 100644 --- a/python/src/graph.rs +++ b/python/src/graph.rs @@ -34,6 +34,7 @@ use pyo3::{ }; use serde_json::Value as JsonValue; +use crate::namespace::PyNamespaceAdapter; use crate::RT; /// Execution strategy for Cypher queries @@ -490,8 +491,9 @@ impl CypherQuery { /// /// Parameters /// ---------- - /// datasets : dict - /// Dictionary mapping table names to Lance datasets + /// datasets : dict or LanceNamespace + /// Either a dictionary mapping table names to Lance datasets, or a Lance + /// namespace instance that can resolve table locations dynamically. /// strategy : ExecutionStrategy, optional /// Execution strategy to use (defaults to DataFusion) /// @@ -517,24 +519,37 @@ impl CypherQuery { fn execute( &self, py: Python, - datasets: &Bound<'_, PyDict>, + datasets: &Bound<'_, PyAny>, strategy: Option, ) -> PyResult { - // Convert datasets to Arrow batches while holding the GIL - let arrow_datasets = python_datasets_to_batches(datasets)?; - - // Convert Python strategy to Rust strategy let rust_strategy = strategy.map(|s| s.into()); - // Clone the inner query for use in the async block let inner_query = self.inner.clone(); - // Use RT.block_on with Some(py) like the scanner to_pyarrow method - let result_batch = RT - .block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))? - .map_err(graph_error_to_pyerr)?; + if let Ok(dict) = datasets.downcast::() { + // Convert datasets to Arrow batches while holding the GIL + let arrow_datasets = python_datasets_to_batches(&dict)?; + + // Use RT.block_on with Some(py) like the scanner to_pyarrow method + let result_batch = RT + .block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))? + .map_err(graph_error_to_pyerr)?; - record_batch_to_python_table(py, &result_batch) + record_batch_to_python_table(py, &result_batch) + } else { + // Treat input as a Lance namespace catalog + let namespace_adapter = PyNamespaceAdapter::new(datasets.clone().unbind()); + let namespace = Arc::new(namespace_adapter) as Arc; + + let result_batch = RT + .block_on( + Some(py), + inner_query.execute_with_namespace(namespace, rust_strategy), + )? + .map_err(graph_error_to_pyerr)?; + + record_batch_to_python_table(py, &result_batch) + } } /// Explain query using the DataFusion planner with in-memory datasets diff --git a/python/src/lib.rs b/python/src/lib.rs index 32973a54..d9f440fb 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -4,6 +4,7 @@ use pyo3::prelude::*; mod executor; mod graph; +mod namespace; pub(crate) static RT: LazyLock = LazyLock::new(executor::BackgroundExecutor::new); diff --git a/python/src/namespace.rs b/python/src/namespace.rs new file mode 100644 index 00000000..f585ffc1 --- /dev/null +++ b/python/src/namespace.rs @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::collections::HashMap; + +use async_trait::async_trait; +use lance_namespace::models::{DescribeTableRequest, DescribeTableResponse}; +use lance_namespace::{Error as NamespaceError, LanceNamespace, Result as NamespaceResult}; +use pyo3::prelude::*; +use pyo3::prelude::PyAnyMethods; +use pyo3::types::{PyAny, PyDict, PyList}; +use snafu::Location; + +#[derive(Clone, Debug)] +pub struct PyNamespaceAdapter { + namespace: Py, +} + +impl PyNamespaceAdapter { + pub fn new(namespace: Py) -> Self { + Self { namespace } + } + + fn convert_response( + py: Python, + response: &Bound<'_, PyAny>, + ) -> NamespaceResult { + let location = extract_field::(py, response, "location")?; + let storage_options = extract_field::>( + py, + response, + "storage_options", + )?; + + Ok(DescribeTableResponse { + location, + storage_options, + ..DescribeTableResponse::new() + }) + } +} + +fn extract_field( + py: Python, + obj: &Bound<'_, PyAny>, + key: &str, +) -> NamespaceResult> +where + T: for<'a> FromPyObject<'a>, +{ + if let Ok(attr) = obj.getattr(key) { + if attr.is_none() { + return Ok(None); + } + return attr + .extract::() + .map(Some) + .map_err(py_err_to_namespace_error); + } + + if obj + .hasattr("get") + .map_err(py_err_to_namespace_error)? + { + let value = obj + .call_method1("get", (key, py.None())) + .map_err(py_err_to_namespace_error)?; + if value.is_none() { + return Ok(None); + } + return value + .extract::() + .map(Some) + .map_err(py_err_to_namespace_error); + } + + Ok(None) +} + +unsafe impl Send for PyNamespaceAdapter {} +unsafe impl Sync for PyNamespaceAdapter {} + +#[derive(Debug)] +struct PyNamespaceError(String); + +impl std::fmt::Display for PyNamespaceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for PyNamespaceError {} + +fn py_err_to_namespace_error(err: PyErr) -> NamespaceError { + NamespaceError::Namespace { + source: Box::new(PyNamespaceError(err.to_string())), + location: Location::new(file!(), line!(), column!()), + } +} + +#[async_trait] +impl LanceNamespace for PyNamespaceAdapter { + fn namespace_id(&self) -> String { + Python::with_gil(|py| { + let obj = self.namespace.bind(py); + if let Ok(id_obj) = obj.call_method0("namespace_id") { + if let Ok(id) = id_obj.extract::() { + return id; + } + } + + obj.repr() + .map(|repr| repr.to_string()) + .unwrap_or_else(|_| "PyNamespaceAdapter".to_string()) + }) + } + + async fn describe_table( + &self, + request: DescribeTableRequest, + ) -> NamespaceResult { + let namespace = self.namespace.clone(); + let id = request.id.clone(); + let version = request.version; + + tokio::task::spawn_blocking(move || { + Python::with_gil(|py| -> NamespaceResult { + let obj = namespace.bind(py); + let kwargs = PyDict::new(py); + + if let Some(id_components) = id { + let py_list = + PyList::new(py, &id_components).map_err(py_err_to_namespace_error)?; + kwargs + .set_item("id", py_list) + .map_err(py_err_to_namespace_error)?; + } + + if let Some(version) = version { + kwargs + .set_item("version", version) + .map_err(py_err_to_namespace_error)?; + } + + let result = obj + .call_method("describe_table", (), Some(&kwargs)) + .map_err(py_err_to_namespace_error)?; + + Self::convert_response(py, &result) + }) + }) + .await + .map_err(|err| NamespaceError::Namespace { + source: Box::new(PyNamespaceError(format!( + "Python describe_table task failed: {}", + err + ))), + location: Location::new(file!(), line!(), column!()), + })? + } +} diff --git a/rust/lance-graph/Cargo.toml b/rust/lance-graph/Cargo.toml index 8ce0a6d3..4db14075 100644 --- a/rust/lance-graph/Cargo.toml +++ b/rust/lance-graph/Cargo.toml @@ -30,6 +30,7 @@ datafusion-functions-aggregate = "50.3" futures = "0.3" lance = "1.0.0" lance-linalg = "1.0.0" +lance-namespace = "1.0.1" nom = "7.1" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/rust/lance-graph/src/query.rs b/rust/lance-graph/src/query.rs index 8028eb0f..94320744 100644 --- a/rust/lance-graph/src/query.rs +++ b/rust/lance-graph/src/query.rs @@ -11,6 +11,7 @@ use crate::parser::parse_cypher_query; use crate::simple_executor::{ to_df_boolean_expr_simple, to_df_order_by_expr_simple, to_df_value_expr_simple, PathExecutor, }; +use lance_namespace::LanceNamespace; use std::collections::HashMap; /// Execution strategy for Cypher queries @@ -359,6 +360,43 @@ impl CypherQuery { .await } + /// Execute query using a Lance namespace catalog + /// + /// This method resolves node labels and relationship types by querying a Lance + /// namespace implementation. For each required table, the namespace is asked + /// for its storage location, the corresponding Lance dataset is opened, and + /// registered with a DataFusion [`SessionContext`]. + /// + /// # Arguments + /// * `namespace` - Lance namespace implementation capable of resolving table metadata. + /// * `strategy` - Optional execution strategy (defaults to DataFusion). + pub async fn execute_with_namespace( + &self, + namespace: std::sync::Arc, + strategy: Option, + ) -> Result { + let strategy = strategy.unwrap_or_default(); + match strategy { + ExecutionStrategy::DataFusion => { + let (catalog, ctx) = self + .build_catalog_and_context_from_namespace(namespace) + .await?; + self.execute_with_catalog_and_context(std::sync::Arc::new(catalog), ctx) + .await + } + ExecutionStrategy::Simple => Err(GraphError::UnsupportedFeature { + feature: + "Simple execution strategy is not supported for namespace-backed queries" + .to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + }), + ExecutionStrategy::LanceNative => Err(GraphError::UnsupportedFeature { + feature: "Lance native execution strategy is not yet implemented".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + }), + } + } + /// Execute query with an explicit catalog and session context /// /// This is the most flexible API for advanced users who want to provide their own @@ -468,6 +506,105 @@ impl CypherQuery { .await } + async fn build_catalog_and_context_from_namespace( + &self, + namespace: std::sync::Arc, + ) -> Result<( + crate::source_catalog::InMemoryCatalog, + datafusion::execution::context::SessionContext, + )> { + use crate::source_catalog::InMemoryCatalog; + use datafusion::datasource::{DefaultTableSource, TableProvider}; + use datafusion::execution::context::SessionContext; + use lance::datafusion::LanceTableProvider; + use lance::dataset::builder::DatasetBuilder; + use std::collections::HashSet; + use std::sync::Arc; + + let config = self.require_config()?; + + let ctx = SessionContext::new(); + let mut catalog = InMemoryCatalog::new(); + let mut table_cache: HashMap> = HashMap::new(); + + let mut required_tables: HashSet = HashSet::new(); + required_tables.extend(config.node_mappings.keys().cloned()); + required_tables.extend(config.relationship_mappings.keys().cloned()); + + for table_name in &required_tables { + if !table_cache.contains_key(table_name) { + let builder: DatasetBuilder = DatasetBuilder::from_namespace( + namespace.clone(), + vec![table_name.clone()], + false, + ) + .await + .map_err(|e| GraphError::ConfigError { + message: format!( + "Failed to resolve table '{}' from namespace: {}", + table_name, e + ), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let dataset = builder.load().await.map_err(|e| GraphError::ExecutionError { + message: format!( + "Failed to load dataset '{}' from namespace: {}", + table_name, e + ), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let dataset = Arc::new(dataset); + let provider: Arc = + Arc::new(LanceTableProvider::new(dataset, true, true)); + + ctx.register_table(table_name, provider.clone()) + .map_err(|e| GraphError::PlanError { + message: format!( + "Failed to register namespace table '{}' with SessionContext: {}", + table_name, e + ), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + table_cache.insert(table_name.clone(), provider); + } + } + + for label in config.node_mappings.keys() { + if let Some(provider) = table_cache.get(label) { + let table_source = Arc::new(DefaultTableSource::new(provider.clone())); + catalog = catalog.with_node_source(label, table_source); + } else { + return Err(GraphError::ConfigError { + message: format!( + "Namespace did not return dataset for node label '{}'", + label + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } + } + + for rel_type in config.relationship_mappings.keys() { + if let Some(provider) = table_cache.get(rel_type) { + let table_source = Arc::new(DefaultTableSource::new(provider.clone())); + catalog = catalog.with_relationship_source(rel_type, table_source); + } else { + return Err(GraphError::ConfigError { + message: format!( + "Namespace did not return dataset for relationship '{}'", + rel_type + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } + } + + Ok((catalog, ctx)) + } + /// Helper to build catalog and context from in-memory datasets async fn build_catalog_and_context_from_datasets( &self,