Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ arrow-schema = "56.2"
arrow-ipc = "56.2"
futures = "0.3"
lance-graph = { path = "../rust/lance-graph" }
lance-namespace = "1.0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
async-trait = "0.1"
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39", "py-clone"] }
snafu = "0.8"
tokio = { version = "1.37", features = ["rt-multi-thread", "macros"] }
31 changes: 31 additions & 0 deletions python/python/tests/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from pathlib import Path

import pyarrow as pa
import pytest
from lance_graph import CypherQuery, GraphConfig
Expand Down Expand Up @@ -194,3 +196,32 @@ def test_distinct_clause(graph_env):

assert len(data["c.company_name"]) == 3
assert set(data["c.company_name"]) == {"TechCorp", "DataInc", "CloudSoft"}


def test_execute_with_directory_namespace(graph_env, tmp_path, monkeypatch):
config, datasets, _ = graph_env

repo_root = Path(__file__).resolve().parents[3]
monkeypatch.syspath_prepend(str(repo_root / "lance" / "python" / "python"))
monkeypatch.syspath_prepend(str(repo_root / "lance-namespace" / "python"))

try:
from lance import write_dataset
from lance_namespace import connect
except ImportError:
pytest.skip("Lance namespace dependencies not available")

for name, table in datasets.items():
write_dataset(table, tmp_path / f"{name}.lance")

catalog = connect("dir", {"root": str(tmp_path)})

query = (
CypherQuery("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")
.with_config(config)
)

result = query.execute(catalog)
data = result.to_pydict()

assert set(data["p.name"]) == {"Bob", "David"}
41 changes: 28 additions & 13 deletions python/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use pyo3::{
};
use serde_json::Value as JsonValue;

use crate::namespace::PyNamespaceAdapter;
use crate::RT;

/// Execution strategy for Cypher queries
Expand Down Expand Up @@ -490,8 +491,9 @@ impl CypherQuery {
///
/// Parameters
/// ----------
/// datasets : dict
/// Dictionary mapping table names to Lance datasets
/// datasets : dict or LanceNamespace
/// Either a dictionary mapping table names to Lance datasets, or a Lance
/// namespace instance that can resolve table locations dynamically.
/// strategy : ExecutionStrategy, optional
/// Execution strategy to use (defaults to DataFusion)
///
Expand All @@ -517,24 +519,37 @@ impl CypherQuery {
fn execute(
&self,
py: Python,
datasets: &Bound<'_, PyDict>,
datasets: &Bound<'_, PyAny>,
strategy: Option<ExecutionStrategy>,
) -> PyResult<PyObject> {
// Convert datasets to Arrow batches while holding the GIL
let arrow_datasets = python_datasets_to_batches(datasets)?;

// Convert Python strategy to Rust strategy
let rust_strategy = strategy.map(|s| s.into());

// Clone the inner query for use in the async block
let inner_query = self.inner.clone();

// Use RT.block_on with Some(py) like the scanner to_pyarrow method
let result_batch = RT
.block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))?
.map_err(graph_error_to_pyerr)?;
if let Ok(dict) = datasets.downcast::<PyDict>() {
// Convert datasets to Arrow batches while holding the GIL
let arrow_datasets = python_datasets_to_batches(&dict)?;

// Use RT.block_on with Some(py) like the scanner to_pyarrow method
let result_batch = RT
.block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))?
.map_err(graph_error_to_pyerr)?;

record_batch_to_python_table(py, &result_batch)
record_batch_to_python_table(py, &result_batch)
} else {
// Treat input as a Lance namespace catalog
let namespace_adapter = PyNamespaceAdapter::new(datasets.clone().unbind());
let namespace = Arc::new(namespace_adapter) as Arc<dyn lance_namespace::LanceNamespace>;

let result_batch = RT
.block_on(
Some(py),
inner_query.execute_with_namespace(namespace, rust_strategy),
)?
.map_err(graph_error_to_pyerr)?;

record_batch_to_python_table(py, &result_batch)
}
}

/// Explain query using the DataFusion planner with in-memory datasets
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pyo3::prelude::*;

mod executor;
mod graph;
mod namespace;

pub(crate) static RT: LazyLock<executor::BackgroundExecutor> =
LazyLock::new(executor::BackgroundExecutor::new);
Expand Down
161 changes: 161 additions & 0 deletions python/src/namespace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;

use async_trait::async_trait;
use lance_namespace::models::{DescribeTableRequest, DescribeTableResponse};
use lance_namespace::{Error as NamespaceError, LanceNamespace, Result as NamespaceResult};
use pyo3::prelude::*;
use pyo3::prelude::PyAnyMethods;
use pyo3::types::{PyAny, PyDict, PyList};
use snafu::Location;

#[derive(Clone, Debug)]
pub struct PyNamespaceAdapter {
namespace: Py<PyAny>,
}

impl PyNamespaceAdapter {
pub fn new(namespace: Py<PyAny>) -> Self {
Self { namespace }
}

fn convert_response(
py: Python,
response: &Bound<'_, PyAny>,
) -> NamespaceResult<DescribeTableResponse> {
let location = extract_field::<String>(py, response, "location")?;
let storage_options = extract_field::<HashMap<String, String>>(
py,
response,
"storage_options",
)?;

Ok(DescribeTableResponse {
location,
storage_options,
..DescribeTableResponse::new()
})
}
}

fn extract_field<T>(
py: Python,
obj: &Bound<'_, PyAny>,
key: &str,
) -> NamespaceResult<Option<T>>
where
T: for<'a> FromPyObject<'a>,
{
if let Ok(attr) = obj.getattr(key) {
if attr.is_none() {
return Ok(None);
}
return attr
.extract::<T>()
.map(Some)
.map_err(py_err_to_namespace_error);
}

if obj
.hasattr("get")
.map_err(py_err_to_namespace_error)?
{
let value = obj
.call_method1("get", (key, py.None()))
.map_err(py_err_to_namespace_error)?;
if value.is_none() {
return Ok(None);
}
return value
.extract::<T>()
.map(Some)
.map_err(py_err_to_namespace_error);
}

Ok(None)
}

unsafe impl Send for PyNamespaceAdapter {}
unsafe impl Sync for PyNamespaceAdapter {}

#[derive(Debug)]
struct PyNamespaceError(String);

impl std::fmt::Display for PyNamespaceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl std::error::Error for PyNamespaceError {}

fn py_err_to_namespace_error(err: PyErr) -> NamespaceError {
NamespaceError::Namespace {
source: Box::new(PyNamespaceError(err.to_string())),
location: Location::new(file!(), line!(), column!()),
}
}

#[async_trait]
impl LanceNamespace for PyNamespaceAdapter {
fn namespace_id(&self) -> String {
Python::with_gil(|py| {
let obj = self.namespace.bind(py);
if let Ok(id_obj) = obj.call_method0("namespace_id") {
if let Ok(id) = id_obj.extract::<String>() {
return id;
}
}

obj.repr()
.map(|repr| repr.to_string())
.unwrap_or_else(|_| "PyNamespaceAdapter".to_string())
})
}

async fn describe_table(
&self,
request: DescribeTableRequest,
) -> NamespaceResult<DescribeTableResponse> {
let namespace = self.namespace.clone();
let id = request.id.clone();
let version = request.version;

tokio::task::spawn_blocking(move || {
Python::with_gil(|py| -> NamespaceResult<DescribeTableResponse> {
let obj = namespace.bind(py);
let kwargs = PyDict::new(py);

if let Some(id_components) = id {
let py_list =
PyList::new(py, &id_components).map_err(py_err_to_namespace_error)?;
kwargs
.set_item("id", py_list)
.map_err(py_err_to_namespace_error)?;
}

if let Some(version) = version {
kwargs
.set_item("version", version)
.map_err(py_err_to_namespace_error)?;
}

let result = obj
.call_method("describe_table", (), Some(&kwargs))
.map_err(py_err_to_namespace_error)?;

Self::convert_response(py, &result)
})
})
.await
.map_err(|err| NamespaceError::Namespace {
source: Box::new(PyNamespaceError(format!(
"Python describe_table task failed: {}",
err
))),
location: Location::new(file!(), line!(), column!()),
})?
}
}
1 change: 1 addition & 0 deletions rust/lance-graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ datafusion-functions-aggregate = "50.3"
futures = "0.3"
lance = "1.0.0"
lance-linalg = "1.0.0"
lance-namespace = "1.0.1"
nom = "7.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
Loading
Loading