diff --git a/python/python/tests/test_explain.py b/python/python/tests/test_explain.py index 7cbd73ba..a0e40a6c 100644 --- a/python/python/tests/test_explain.py +++ b/python/python/tests/test_explain.py @@ -1,4 +1,4 @@ -"""Tests for explain_datafusion API.""" +"""Tests for explain API.""" import pyarrow as pa import pytest @@ -25,7 +25,7 @@ def test_explain_simple_query(person_data): """Test explain output contains all expected sections.""" config, people = person_data query = CypherQuery("MATCH (p:Person) RETURN p.name, p.age").with_config(config) - plan = query.explain_datafusion({"Person": people}) + plan = query.explain({"Person": people}) # Verify the plan is a non-empty string assert isinstance(plan, str) @@ -48,7 +48,7 @@ def test_explain_with_clauses(person_data): query = CypherQuery( "MATCH (p:Person) WHERE p.age > 30 RETURN p.name ORDER BY p.age LIMIT 2" ).with_config(config) - plan = query.explain_datafusion({"Person": people}) + plan = query.explain({"Person": people}) assert isinstance(plan, str) assert "WHERE p.age > 30" in plan @@ -63,11 +63,11 @@ def test_explain_error_handling(person_data): # Missing config query_no_config = CypherQuery("MATCH (p:Person) RETURN p.name") with pytest.raises(ValueError, match="Graph configuration is required"): - query_no_config.explain_datafusion({"Person": people}) + query_no_config.explain({"Person": people}) # Missing datasets query_with_config = CypherQuery("MATCH (p:Person) RETURN p.name").with_config( config ) with pytest.raises(ValueError, match="No input datasets provided"): - query_with_config.explain_datafusion({}) + query_with_config.explain({}) diff --git a/python/python/tests/test_graph.py b/python/python/tests/test_graph.py index da818746..c699d338 100644 --- a/python/python/tests/test_graph.py +++ b/python/python/tests/test_graph.py @@ -62,11 +62,10 @@ def graph_env(tmp_path): return config, datasets, people_table -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_basic_node_selection(graph_env, execute_method): +def test_basic_node_selection(graph_env): config, datasets, _ = graph_env query = CypherQuery("MATCH (p:Person) RETURN p.name, p.age").with_config(config) - result = getattr(query, execute_method)({"Person": datasets["Person"]}) + result = query.execute({"Person": datasets["Person"]}) data = result.to_pydict() assert set(data.keys()) == {"p.name", "p.age"} @@ -74,13 +73,12 @@ def test_basic_node_selection(graph_env, execute_method): assert "Alice" in set(data["p.name"]) -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_filtered_query(graph_env, execute_method): +def test_filtered_query(graph_env): config, datasets, _ = graph_env query = CypherQuery( "MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age" ).with_config(config) - result = getattr(query, execute_method)({"Person": datasets["Person"]}) + result = query.execute({"Person": datasets["Person"]}) data = result.to_pydict() assert len(data["p.name"]) == 2 @@ -88,15 +86,14 @@ def test_filtered_query(graph_env, execute_method): assert all(age > 30 for age in data["p.age"]) -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_relationship_query(graph_env, execute_method): +def test_relationship_query(graph_env): config, datasets, _ = graph_env query = CypherQuery( "MATCH (p:Person)-[:WORKS_FOR]->(c:Company) " "RETURN p.person_id AS person_id, p.name AS name, c.company_id AS company_id" ).with_config(config) - result = getattr(query, execute_method)( + result = query.execute( { "Person": datasets["Person"], "Company": datasets["Company"], @@ -109,8 +106,7 @@ def test_relationship_query(graph_env, execute_method): assert data["company_id"] == [101, 101, 102, 103] -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_friendship_direct_and_network(graph_env, execute_method): +def test_friendship_direct_and_network(graph_env): config, datasets, _ = graph_env # Direct friends of Alice (person_id = 1) query_direct = CypherQuery( @@ -119,7 +115,7 @@ def test_friendship_direct_and_network(graph_env, execute_method): "RETURN b.person_id AS friend_id" ).with_config(config) - result_direct = getattr(query_direct, execute_method)( + result_direct = query_direct.execute( { "Person": datasets["Person"], "FRIEND_OF": datasets["FRIEND_OF"], @@ -134,7 +130,7 @@ def test_friendship_direct_and_network(graph_env, execute_method): "RETURN f.person_id AS person1_id, t.person_id AS person2_id" ).with_config(config) - result_edges = getattr(query_edges, execute_method)( + result_edges = query_edges.execute( { "Person": datasets["Person"], "FRIEND_OF": datasets["FRIEND_OF"], @@ -145,8 +141,7 @@ def test_friendship_direct_and_network(graph_env, execute_method): assert got == {(1, 2), (1, 3), (2, 4), (3, 4)} -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_two_hop_friends_of_friends(graph_env, execute_method): +def test_two_hop_friends_of_friends(graph_env): config, datasets, _ = graph_env query = CypherQuery( "MATCH (a:Person)-[:FRIEND_OF]->(b:Person)-[:FRIEND_OF]->(c:Person) " @@ -154,7 +149,7 @@ def test_two_hop_friends_of_friends(graph_env, execute_method): "RETURN a.person_id AS a_id, b.person_id AS b_id, c.person_id AS c_id" ).with_config(config) - result = getattr(query, execute_method)( + result = query.execute( { "Person": datasets["Person"], "FRIEND_OF": datasets["FRIEND_OF"], @@ -164,29 +159,31 @@ def test_two_hop_friends_of_friends(graph_env, execute_method): assert set(data["c_id"]) == {4} -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_variable_length_path(graph_env, execute_method): +def test_variable_length_path(graph_env): config, datasets, _ = graph_env query = CypherQuery( "MATCH (p1:Person)-[:FRIEND_OF*1..2]-(p2:Person) " "RETURN p1.person_id AS p1, p2.person_id AS p2" ).with_config(config) - _ = getattr(query, execute_method)( + + result = query.execute( { "Person": datasets["Person"], "FRIEND_OF": datasets["FRIEND_OF"], } ) + data = result.to_pydict() + got = set(zip(data["p1"], data["p2"])) + assert got == {(1, 2), (1, 3), (2, 4), (3, 4), (1, 4)} -@pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) -def test_distinct_clause(graph_env, execute_method): +def test_distinct_clause(graph_env): config, datasets, _ = graph_env query = CypherQuery( "MATCH (p:Person)-[:WORKS_FOR]->(c:Company) RETURN DISTINCT c.company_name" ).with_config(config) - result = getattr(query, execute_method)( + result = query.execute( { "Person": datasets["Person"], "Company": datasets["Company"], diff --git a/python/src/graph.rs b/python/src/graph.rs index 7914faee..d8563be9 100644 --- a/python/src/graph.rs +++ b/python/src/graph.rs @@ -22,7 +22,8 @@ use arrow::ffi_stream::ArrowArrayStreamReader; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::Schema; use lance_graph::{ - CypherQuery as RustCypherQuery, GraphConfig as RustGraphConfig, GraphError as RustGraphError, + ExecutionStrategy as RustExecutionStrategy, CypherQuery as RustCypherQuery, + GraphConfig as RustGraphConfig, GraphError as RustGraphError, }; use pyo3::{ exceptions::{PyNotImplementedError, PyRuntimeError, PyValueError}, @@ -34,6 +35,28 @@ use serde_json::Value as JsonValue; use crate::RT; +/// Execution strategy for Cypher queries +#[pyclass(name = "ExecutionStrategy", module = "lance.graph")] +#[derive(Clone, Copy)] +pub enum ExecutionStrategy { + /// Use DataFusion query planner (default, full feature support) + DataFusion, + /// Use simple single-table executor (legacy, limited features) + Simple, + /// Use Lance native executor (not yet implemented) + LanceNative, +} + +impl From for RustExecutionStrategy { + fn from(strategy: ExecutionStrategy) -> Self { + match strategy { + ExecutionStrategy::DataFusion => RustExecutionStrategy::DataFusion, + ExecutionStrategy::Simple => RustExecutionStrategy::Simple, + ExecutionStrategy::LanceNative => RustExecutionStrategy::LanceNative, + } + } +} + /// Convert GraphError to PyErr fn graph_error_to_pyerr(err: RustGraphError) -> PyErr { match &err { @@ -267,6 +290,8 @@ impl CypherQuery { /// ---------- /// datasets : dict /// Dictionary mapping table names to Lance datasets + /// strategy : ExecutionStrategy, optional + /// Execution strategy to use (defaults to DataFusion) /// /// Returns /// ------- @@ -277,56 +302,40 @@ impl CypherQuery { /// ------ /// RuntimeError /// If query execution fails - fn execute(&self, py: Python, datasets: &Bound<'_, PyDict>) -> PyResult { - // Convert datasets to Arrow batches while holding the GIL - same as before - let arrow_datasets = python_datasets_to_batches(datasets)?; - - // Clone the inner query for use in the async block - let inner_query = self.inner.clone(); - - // Use RT.block_on with Some(py) like the scanner to_pyarrow method - let result_batch = RT - .block_on(Some(py), inner_query.execute(arrow_datasets))? - .map_err(graph_error_to_pyerr)?; - - record_batch_to_python_table(py, &result_batch) - } - - /// Execute query using the DataFusion planner with in-memory datasets - /// - /// Parameters - /// ---------- - /// datasets : dict - /// Dictionary mapping table names to in-memory tables (pyarrow.Table, LanceDataset, etc.) - /// Keys should match node labels and relationship types in the graph config. /// - /// Returns - /// ------- - /// pyarrow.Table - /// Query results as Arrow table + /// Examples + /// -------- + /// >>> # Default strategy (DataFusion) + /// >>> result = query.execute(datasets) /// - /// Raises - /// ------ - /// ValueError - /// If the query is invalid or datasets are missing - /// RuntimeError - /// If query execution fails - fn execute_datafusion(&self, py: Python, datasets: &Bound<'_, PyDict>) -> PyResult { - // Convert datasets to Arrow RecordBatch map + /// >>> # Explicit strategy + /// >>> from lance.graph import ExecutionStrategy + /// >>> result = query.execute(datasets, strategy=ExecutionStrategy.Simple) + #[pyo3(signature = (datasets, strategy=None))] + fn execute( + &self, + py: Python, + datasets: &Bound<'_, PyDict>, + strategy: Option, + ) -> PyResult { + // Convert datasets to Arrow batches while holding the GIL let arrow_datasets = python_datasets_to_batches(datasets)?; - // Clone for async move + // Convert Python strategy to Rust strategy + let rust_strategy = strategy.map(|s| s.into()); + + // Clone the inner query for use in the async block let inner_query = self.inner.clone(); - // Execute via runtime + // Use RT.block_on with Some(py) like the scanner to_pyarrow method let result_batch = RT - .block_on(Some(py), inner_query.execute_datafusion(arrow_datasets))? + .block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))? .map_err(graph_error_to_pyerr)?; record_batch_to_python_table(py, &result_batch) } - /// Explain query uusing the DataFusion planner with in-memory datasets + /// Explain query using the DataFusion planner with in-memory datasets /// /// Parameters /// ---------- @@ -345,7 +354,7 @@ impl CypherQuery { /// If the query is invalid or datasets are missing /// RuntimeError /// If query explain fails - fn explain_datafusion(&self, py: Python, datasets: &Bound<'_, PyDict>) -> PyResult { + fn explain(&self, py: Python, datasets: &Bound<'_, PyDict>) -> PyResult { // Convert datasets to Arrow RecordBatch map let arrow_datasets = python_datasets_to_batches(datasets)?; @@ -354,7 +363,7 @@ impl CypherQuery { // Execute via runtime let plan = RT - .block_on(Some(py), inner_query.explain_datafusion(arrow_datasets))? + .block_on(Some(py), inner_query.explain(arrow_datasets))? .map_err(graph_error_to_pyerr)?; Ok(plan) @@ -562,6 +571,7 @@ fn record_batch_to_python_table( pub fn register_graph_module(py: Python, parent_module: &Bound<'_, PyModule>) -> PyResult<()> { let graph_module = PyModule::new(py, "graph")?; + graph_module.add_class::()?; graph_module.add_class::()?; graph_module.add_class::()?; graph_module.add_class::()?; diff --git a/rust/lance-graph/benches/graph_execution.rs b/rust/lance-graph/benches/graph_execution.rs index d2e9b0d4..68ab4687 100644 --- a/rust/lance-graph/benches/graph_execution.rs +++ b/rust/lance-graph/benches/graph_execution.rs @@ -22,7 +22,7 @@ use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use futures::TryStreamExt; use lance::dataset::{Dataset, WriteMode, WriteParams}; -use lance_graph::{CypherQuery, GraphConfig}; +use lance_graph::{CypherQuery, ExecutionStrategy, GraphConfig}; use tempfile::TempDir; fn create_people_batch() -> RecordBatch { @@ -71,7 +71,11 @@ fn execute_cypher_query( q: &CypherQuery, datasets: HashMap, ) -> RecordBatch { - rt.block_on(async move { q.execute(datasets).await.unwrap() }) + rt.block_on(async move { + q.execute(datasets, Some(ExecutionStrategy::Simple)) + .await + .unwrap() + }) } fn make_people_batch(n: usize) -> RecordBatch { diff --git a/rust/lance-graph/src/lib.rs b/rust/lance-graph/src/lib.rs index c3516941..fcc5251c 100644 --- a/rust/lance-graph/src/lib.rs +++ b/rust/lance-graph/src/lib.rs @@ -53,4 +53,4 @@ pub const MAX_VARIABLE_LENGTH_HOPS: u32 = 20; pub use config::{GraphConfig, NodeMapping, RelationshipMapping}; pub use error::{GraphError, Result}; -pub use query::CypherQuery; +pub use query::{CypherQuery, ExecutionStrategy}; diff --git a/rust/lance-graph/src/query.rs b/rust/lance-graph/src/query.rs index 7306a3e8..41dba161 100644 --- a/rust/lance-graph/src/query.rs +++ b/rust/lance-graph/src/query.rs @@ -17,6 +17,18 @@ mod clauses; mod expr; mod simple_executor; +/// Execution strategy for Cypher queries +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum ExecutionStrategy { + /// Use DataFusion query planner (default, full feature support) + #[default] + DataFusion, + /// Use simple single-table executor (legacy, limited features) + Simple, + /// Use Lance native executor (not yet implemented) + LanceNative, +} + /// A Cypher query that can be executed against Lance datasets #[derive(Debug, Clone)] pub struct CypherQuery { @@ -92,22 +104,22 @@ impl CypherQuery { }) } - /// Execute using the DataFusion planner with in-memory datasets + /// Execute the query against provided in-memory datasets /// - /// # Overview - /// This convenience method creates both a catalog and session context from the provided - /// in-memory RecordBatches. It's ideal for testing and small datasets that fit in memory. - /// - /// For production use with external data sources (CSV, Parquet, databases), use - /// `execute_with_datafusion_context` instead, which automatically builds the catalog - /// from the SessionContext. + /// This method uses the DataFusion planner by default for comprehensive query support + /// including joins, aggregations, and complex patterns. You can optionally specify + /// a different execution strategy. /// /// # Arguments /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships) + /// * `strategy` - Optional execution strategy (defaults to DataFusion) /// /// # Returns /// A single RecordBatch containing the query results /// + /// # Errors + /// Returns error if query parsing, planning, or execution fails + /// /// # Example /// ```ignore /// use std::collections::HashMap; @@ -122,12 +134,66 @@ impl CypherQuery { /// // Parse and execute query /// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name, f.name")? /// .with_config(config); - /// let result = query.execute_datafusion(datasets).await?; + /// // Use the default DataFusion strategy + /// let result = query.execute(datasets, None).await?; + /// // Use the Simple strategy explicitly + /// let result = query.execute(datasets, Some(ExecutionStrategy::Simple)).await?; /// ``` - pub async fn execute_datafusion( + pub async fn execute( &self, datasets: HashMap, + strategy: Option, ) -> Result { + let strategy = strategy.unwrap_or_default(); + match strategy { + ExecutionStrategy::DataFusion => self.execute_datafusion(datasets).await, + ExecutionStrategy::Simple => self.execute_simple(datasets).await, + ExecutionStrategy::LanceNative => Err(GraphError::UnsupportedFeature { + feature: "Lance native execution strategy is not yet implemented".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + }), + } + } + + /// Explain the query execution plan using in-memory datasets + /// + /// Returns a formatted string showing the query execution plan at different stages: + /// - Graph Logical Plan (graph-specific operators) + /// - DataFusion Logical Plan (optimized relational plan) + /// - DataFusion Physical Plan (execution plan with optimizations) + /// + /// This is useful for understanding query performance, debugging, and optimization. + /// + /// # Arguments + /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships) + /// + /// # Returns + /// A formatted string containing the execution plan at multiple levels + /// + /// # Errors + /// Returns error if planning fails + /// + /// # Example + /// ```ignore + /// use std::collections::HashMap; + /// use arrow::record_batch::RecordBatch; + /// use lance_graph::query::CypherQuery; + /// + /// // Create in-memory datasets + /// let mut datasets = HashMap::new(); + /// datasets.insert("Person".to_string(), person_batch); + /// datasets.insert("KNOWS".to_string(), knows_batch); + /// + /// let query = CypherQuery::parse("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")? + /// .with_config(config); + /// + /// let plan = query.explain(datasets).await?; + /// println!("{}", plan); + /// ``` + pub async fn explain( + &self, + datasets: HashMap, + ) -> Result { use std::sync::Arc; // Build catalog and context from datasets @@ -135,9 +201,8 @@ impl CypherQuery { .build_catalog_and_context_from_datasets(datasets) .await?; - // Delegate to common execution logic - self.execute_with_catalog_and_context(Arc::new(catalog), ctx) - .await + // Delegate to the internal explain method + self.explain_internal(Arc::new(catalog), ctx).await } /// Execute query with a DataFusion SessionContext, automatically building the catalog @@ -181,14 +246,14 @@ impl CypherQuery { /// // Step 3: Execute query (catalog is built automatically) /// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name")? /// .with_config(config); - /// let result = query.execute_with_datafusion_context(ctx).await?; + /// let result = query.execute_with_context(ctx).await?; /// ``` /// /// # Note /// The catalog is built by querying the SessionContext for schemas of tables /// mentioned in the GraphConfig. Table names must match between GraphConfig /// (node labels/relationship types) and SessionContext (registered table names). - pub async fn execute_with_datafusion_context( + pub async fn execute_with_context( &self, ctx: datafusion::execution::context::SessionContext, ) -> Result { @@ -318,45 +383,25 @@ impl CypherQuery { }) } - /// Explain the query execution plan using in-memory datasets + /// Execute using the DataFusion planner with in-memory datasets /// - /// Returns a formatted string showing the query execution plan at different stages: - /// - Graph Logical Plan (graph-specific operators) - /// - DataFusion Logical Plan (optimized relational plan) - /// - DataFusion Physical Plan (execution plan with optimizations) + /// # Overview + /// This convenience method creates both a catalog and session context from the provided + /// in-memory RecordBatches. It's ideal for testing and small datasets that fit in memory. /// - /// This is useful for understanding query performance, debugging, and optimization. + /// For production use with external data sources (CSV, Parquet, databases), use + /// `execute_with_context` instead, which automatically builds the catalog + /// from the SessionContext. /// /// # Arguments /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships) /// /// # Returns - /// A formatted string containing the execution plan at multiple levels - /// - /// # Errors - /// Returns error if planning fails - /// - /// # Example - /// ```ignore - /// use std::collections::HashMap; - /// use arrow::record_batch::RecordBatch; - /// use lance_graph::query::CypherQuery; - /// - /// // Create in-memory datasets - /// let mut datasets = HashMap::new(); - /// datasets.insert("Person".to_string(), person_batch); - /// datasets.insert("KNOWS".to_string(), knows_batch); - /// - /// let query = CypherQuery::parse("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")? - /// .with_config(config); - /// - /// let plan = query.explain_datafusion(datasets).await?; - /// println!("{}", plan); - /// ``` - pub async fn explain_datafusion( + /// A single RecordBatch containing the query results + async fn execute_datafusion( &self, datasets: HashMap, - ) -> Result { + ) -> Result { use std::sync::Arc; // Build catalog and context from datasets @@ -364,8 +409,9 @@ impl CypherQuery { .build_catalog_and_context_from_datasets(datasets) .await?; - // Delegate to the internal explain method - self.explain_internal(Arc::new(catalog), ctx).await + // Delegate to common execution logic + self.execute_with_catalog_and_context(Arc::new(catalog), ctx) + .await } /// Helper to build catalog and context from in-memory datasets @@ -601,30 +647,6 @@ impl CypherQuery { Ok(output) } - /// Execute the query against provided in-memory datasets using the DataFusion planner - /// - /// This is the primary execution method that uses the full DataFusion-based planner - /// for comprehensive query support including joins, aggregations, and complex patterns. - /// - /// For legacy single-table queries, use `execute_simple()` instead. - pub async fn execute( - &self, - datasets: HashMap, - ) -> Result { - self.execute_datafusion(datasets).await - } - - /// Explain the query execution plan using the DataFusion planner - /// - /// This method provides a high-level overview of the query execution plan - /// using the DataFusion planner, which is useful for debugging and optimization. - pub async fn explain( - &self, - datasets: HashMap, - ) -> Result { - self.explain_datafusion(datasets).await - } - /// Execute simple single-table queries (legacy implementation) /// /// This method supports basic projection/filter/limit workflows on a single table. @@ -1564,7 +1586,7 @@ mod tests { .with_config(cfg); // Execute with context (catalog built automatically) - let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + let result = query.execute_with_context(ctx).await.unwrap(); // Verify results assert_eq!(result.num_rows(), 3); @@ -1621,7 +1643,7 @@ mod tests { .with_config(cfg); // Execute with context - let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + let result = query.execute_with_context(ctx).await.unwrap(); // Verify: should return Bob (34) and David (42) assert_eq!(result.num_rows(), 2); @@ -1708,7 +1730,7 @@ mod tests { .with_config(cfg); // Execute with context - let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + let result = query.execute_with_context(ctx).await.unwrap(); // Verify: should return 2 relationships (Alice->Bob, Bob->Carol) assert_eq!(result.num_rows(), 2); @@ -1781,7 +1803,7 @@ mod tests { .with_config(cfg); // Execute with context - let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + let result = query.execute_with_context(ctx).await.unwrap(); // Verify: should return top 2 scores (David: 95, Bob: 92) assert_eq!(result.num_rows(), 2); diff --git a/rust/lance-graph/tests/test_datafusion_pipeline.rs b/rust/lance-graph/tests/test_datafusion_pipeline.rs index 9de2cbc3..1f6669f8 100644 --- a/rust/lance-graph/tests/test_datafusion_pipeline.rs +++ b/rust/lance-graph/tests/test_datafusion_pipeline.rs @@ -1,7 +1,7 @@ use arrow_array::{Array, Float64Array, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance_graph::config::GraphConfig; -use lance_graph::query::CypherQuery; +use lance_graph::{CypherQuery, ExecutionStrategy}; use std::collections::HashMap; use std::sync::Arc; @@ -127,7 +127,10 @@ async fn execute_test_query(cypher: &str) -> RecordBatch { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - query.execute_datafusion(datasets).await.unwrap() + query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap() } // Helper function to extract string column values @@ -158,7 +161,10 @@ async fn test_datafusion_simple_node_scan() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return all 5 people assert_eq!(result.num_rows(), 5); @@ -192,7 +198,10 @@ async fn test_datafusion_node_filtering() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return 3 people (Bob:35, David:40, Charlie:30 is not > 30) assert_eq!(result.num_rows(), 2); @@ -235,7 +244,10 @@ async fn test_datafusion_multiple_conditions() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return people with age >= 30 // Bob:35, Charlie:30, David:40 @@ -275,7 +287,10 @@ async fn test_datafusion_relationship_traversal() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return source names for all relationships assert_eq!(result.num_rows(), 5); // 5 relationships in the dataset @@ -319,7 +334,10 @@ async fn test_datafusion_relationship_with_variable() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_columns(), 1); assert_eq!(result.num_rows(), 5); @@ -360,7 +378,10 @@ async fn test_datafusion_complex_filtering() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_columns(), 1); // Only Bob (35) -> Charlie (30), David doesn't connect to anyone age 30 @@ -388,7 +409,10 @@ async fn test_datafusion_projection_multiple_properties() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return people with age >= 28 (Bob:35, Charlie:30, Eve:28, David:40) assert_eq!(result.num_rows(), 4); @@ -425,7 +449,9 @@ async fn test_datafusion_error_handling_missing_config() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await; + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await; assert!(result.is_err()); let error_msg = format!("{:?}", result.unwrap_err()); @@ -442,7 +468,9 @@ async fn test_datafusion_error_handling_empty_datasets() { let datasets = HashMap::new(); // Empty datasets - let result = query.execute_datafusion(datasets).await; + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await; assert!(result.is_err()); let error_msg = format!("{:?}", result.unwrap_err()); @@ -483,7 +511,10 @@ async fn test_datafusion_performance_large_dataset() { datasets.insert("Person".to_string(), large_batch); let start = std::time::Instant::now(); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let duration = start.elapsed(); // Should complete reasonably quickly (adjust threshold as needed) @@ -514,7 +545,10 @@ async fn test_datafusion_empty_result_set() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return empty result set assert_eq!(result.num_rows(), 0); @@ -536,7 +570,10 @@ async fn test_datafusion_all_columns_projection() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return Alice's data assert_eq!(result.num_rows(), 1); @@ -585,7 +622,10 @@ async fn test_datafusion_relationship_count() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return 5 relationships (as per create_knows_dataset) assert_eq!(result.num_rows(), 5); @@ -630,7 +670,10 @@ async fn test_datafusion_one_hop_source_names_strict() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 1); assert_eq!(out.num_rows(), 5); @@ -667,7 +710,10 @@ async fn test_datafusion_one_hop_filtered_source_age_strict() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 1); // Bob (35): 2->3, David (40): 4->5 assert_eq!(out.num_rows(), 2); @@ -704,7 +750,10 @@ async fn test_datafusion_one_hop_with_city_filter() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Only Eve has city = 'Seattle' and is reachable (David->Eve) assert_eq!(out.num_rows(), 1); @@ -735,7 +784,10 @@ async fn test_datafusion_one_hop_multiple_properties() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 4); assert_eq!(out.num_rows(), 5); @@ -785,7 +837,10 @@ async fn test_datafusion_one_hop_return_relationship_properties() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return 3 columns: a.name, r.since_year, b.name assert_eq!(out.num_columns(), 3); @@ -880,7 +935,10 @@ async fn test_datafusion_two_hop_return_intermediate() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 1); assert_eq!(out.num_rows(), 4); @@ -918,7 +976,10 @@ async fn test_datafusion_two_hop_return_all_three() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 3); assert_eq!(out.num_rows(), 4); @@ -988,7 +1049,10 @@ async fn test_datafusion_two_hop_with_filter() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Filter: b.age > 30 means b can be Bob(35), David(40) // Paths with Bob as intermediate: 1->2->3 (Alice->Bob->Charlie) @@ -1027,7 +1091,10 @@ async fn test_datafusion_two_hop_with_relationship_variable() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 2); assert_eq!(out.num_rows(), 4); @@ -1107,7 +1174,10 @@ async fn test_datafusion_two_hop_with_multiple_filters() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // a.age < 30: Alice(25), Eve(28) // b.age >= 30: Bob(35), Charlie(30), David(40) @@ -1173,7 +1243,10 @@ async fn test_datafusion_two_hop_return_relationship_properties() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_columns(), 2); // Only Alice->Bob->Charlie (Alice-[2020]->Bob-[2019]->Charlie) assert_eq!(out.num_rows(), 1); @@ -1212,7 +1285,10 @@ async fn test_datafusion_two_hop_return_both_relationship_properties() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return 5 columns: a.name, r1.since_year, b.name, r2.since_year, c.name assert_eq!(out.num_columns(), 5); @@ -1284,7 +1360,10 @@ async fn test_datafusion_two_hop_with_limit() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return only 2 rows (limited from 4 total paths) assert_eq!(out.num_rows(), 2); @@ -1309,7 +1388,10 @@ async fn test_datafusion_complex_boolean_expression() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Matches: // - Bob(35)->Charlie(30): age > 30 AND age < 35 @@ -1357,7 +1439,10 @@ async fn test_datafusion_two_hop_same_intermediate_node() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Paths through Charlie: Bob->Charlie->David, Alice->Charlie->David assert_eq!(out.num_rows(), 2); @@ -1435,7 +1520,10 @@ async fn test_datafusion_two_hop_count_paths_per_source() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Alice's two-hop paths: Alice->Bob->Charlie, Alice->Charlie->David assert_eq!(out.num_rows(), 2); @@ -1473,7 +1561,10 @@ async fn test_datafusion_filter_on_both_nodes_and_edges() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // a: age 25-30 = Alice(25), Charlie(30), Eve(28) // b: age > 30 = Bob(35), David(40) @@ -1518,7 +1609,10 @@ async fn test_datafusion_distinct_with_two_hop() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Sources with two-hop paths: Alice, Bob, Charlie assert_eq!(out.num_rows(), 3); @@ -1573,7 +1667,10 @@ async fn test_datafusion_order_by_single_column_asc() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 5); @@ -1604,7 +1701,10 @@ async fn test_datafusion_order_by_single_column_desc() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 5); @@ -1642,7 +1742,10 @@ async fn test_datafusion_order_by_multiple_columns() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 5); @@ -1675,7 +1778,10 @@ async fn test_datafusion_order_by_with_limit() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should only return 3 rows assert_eq!(out.num_rows(), 3); @@ -1710,7 +1816,10 @@ async fn test_datafusion_order_by_with_filter() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Age >= 30: Bob(35), Charlie(30), David(40) assert_eq!(out.num_rows(), 3); @@ -1744,7 +1853,10 @@ async fn test_datafusion_order_by_relationship_query() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 5); @@ -1780,7 +1892,10 @@ async fn test_datafusion_order_by_two_hop_query() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 4); @@ -1831,7 +1946,10 @@ async fn test_datafusion_return_with_single_alias() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 5); @@ -1862,7 +1980,10 @@ async fn test_datafusion_return_with_multiple_aliases() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Age > 30: Bob(35), Charlie(30 - excluded), David(40) assert_eq!(out.num_rows(), 2); @@ -1903,7 +2024,10 @@ async fn test_datafusion_return_mixed_with_and_without_alias() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 3); @@ -1934,7 +2058,10 @@ async fn test_datafusion_return_alias_with_relationship() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 3); @@ -1973,7 +2100,10 @@ async fn test_datafusion_return_alias_with_order_by() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 2); @@ -2011,7 +2141,10 @@ async fn test_datafusion_varlength_single_hop() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Same as single-hop: Alice→Bob, Alice→Charlie, Bob→Charlie, Charlie→David, David→Eve assert_eq!(out.num_rows(), 5); @@ -2048,7 +2181,10 @@ async fn test_datafusion_varlength_two_hops() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // 2-hop paths: Alice→Bob→Charlie, Alice→Charlie→David, Bob→Charlie→David, Charlie→David→Eve assert_eq!(out.num_rows(), 4); @@ -2098,7 +2234,10 @@ async fn test_datafusion_varlength_one_to_two_hops() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Alice 1-hop: Bob, Charlie // Alice 2-hop: Charlie (via Bob), David (via Charlie) @@ -2138,7 +2277,10 @@ async fn test_datafusion_varlength_with_filter() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Only paths ending at David (age 40) // Alice→Bob→David, Bob→David @@ -2168,7 +2310,10 @@ async fn test_datafusion_varlength_with_order_by() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 4); @@ -2204,7 +2349,10 @@ async fn test_datafusion_varlength_with_limit() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should limit to 3 results assert_eq!(out.num_rows(), 3); @@ -2228,7 +2376,10 @@ async fn test_datafusion_varlength_with_distinct() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Alice reaches: Bob, Charlie, David (3 distinct people within 2 hops) assert_eq!(out.num_rows(), 3); @@ -2265,7 +2416,10 @@ async fn test_datafusion_varlength_three_hops() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Alice 3-hop: Alice→Bob→Charlie→David, Alice→Charlie→David→Eve assert_eq!(out.num_rows(), 2); @@ -2302,7 +2456,10 @@ async fn test_datafusion_varlength_no_results() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Eve has no outgoing KNOWS relationships assert_eq!(out.num_rows(), 0); @@ -2327,7 +2484,10 @@ async fn test_datafusion_varlength_with_source_filter() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let sources = out .column(0) @@ -2361,7 +2521,10 @@ async fn test_datafusion_varlength_return_source_and_target() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // 2-hop paths: Alice→Bob→Charlie, Alice→Charlie→David, Bob→Charlie→David, Charlie→David→Eve assert_eq!(out.num_rows(), 4); @@ -2409,7 +2572,10 @@ async fn test_datafusion_varlength_count() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Alice can reach 4 people within 2 hops assert_eq!(out.num_rows(), 4); @@ -2434,7 +2600,10 @@ async fn test_count_star_all_nodes() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let count_col = result @@ -2461,7 +2630,10 @@ async fn test_count_variable() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let count_col = result @@ -2490,7 +2662,10 @@ async fn test_count_with_filter() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let count_col = result @@ -2518,7 +2693,10 @@ async fn test_count_property() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let count_col = result @@ -2546,7 +2724,10 @@ async fn test_count_with_grouping() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should have 4 groups: NULL (David), Chicago, New York, San Francisco, Seattle assert_eq!(result.num_rows(), 5); @@ -2600,7 +2781,10 @@ async fn test_count_without_alias_has_descriptive_name() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); // Should have column named "count(*)" not "expr" or "count" @@ -2627,7 +2811,10 @@ async fn test_count_property_without_alias_has_descriptive_name() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); // Should have column named "count(p.name)" not "expr" @@ -2654,7 +2841,10 @@ async fn test_sum_property() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let sum_col = result @@ -2683,7 +2873,10 @@ async fn test_sum_with_filter() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let sum_col = result @@ -2712,7 +2905,10 @@ async fn test_sum_with_grouping() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should have 5 groups: NULL, Chicago, New York, San Francisco, Seattle assert_eq!(result.num_rows(), 5); @@ -2763,7 +2959,10 @@ async fn test_sum_without_alias_has_descriptive_name() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); // Should have column named "sum(p.age)" not "expr" @@ -2790,7 +2989,10 @@ async fn test_avg_property() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let avg_col = result @@ -2819,7 +3021,10 @@ async fn test_avg_with_filter() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); let avg_col = result @@ -2849,7 +3054,10 @@ async fn test_avg_with_grouping() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should have 5 groups: NULL, Chicago, New York, San Francisco, Seattle assert_eq!(result.num_rows(), 5); @@ -2900,7 +3108,10 @@ async fn test_avg_without_alias_has_descriptive_name() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); // Should have column named "avg(p.age)" not "expr" @@ -2932,7 +3143,10 @@ async fn test_datafusion_disconnected_patterns_cross_join() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return Alice and Bob assert_eq!(result.num_rows(), 1); @@ -2969,7 +3183,10 @@ async fn test_datafusion_disconnected_patterns_multiple_results() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // a.age > 30: Bob(35), David(40) = 2 people // b.age < 30: Alice(25), Eve(28) = 2 people @@ -3021,7 +3238,10 @@ async fn test_datafusion_mixed_connected_and_disconnected() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // 5 KNOWS relationships * 1 person with age=25 (Alice) = 5 rows assert_eq!(result.num_rows(), 5); @@ -3054,7 +3274,10 @@ async fn test_datafusion_disconnected_with_distinct() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // IDs: 1,2,3,4,5 // Pairs where a.id < b.id: (1,2), (1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,4), (3,5), (4,5) @@ -3096,7 +3319,10 @@ async fn test_datafusion_shared_node_variable_join() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // This is a two-hop path query that should use join key inference on 'b' // Alice(1) -> Bob(2) -> Charlie(3) @@ -3152,7 +3378,10 @@ async fn test_datafusion_shared_variable_with_filter() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should successfully execute with join key inference + filters assert!(result.num_rows() > 0, "Should have results with filters"); @@ -3188,7 +3417,10 @@ async fn test_datafusion_multiple_shared_variables() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // This is a three-hop path query using join key inference on 'b' and 'c' // Should successfully execute (may have 0 or more results depending on data) @@ -3212,7 +3444,10 @@ async fn test_datafusion_shared_variable_distinct() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should return distinct intermediate nodes that have both incoming and outgoing KNOWS edges assert!(result.num_rows() > 0, "Should have intermediate nodes"); @@ -3247,7 +3482,10 @@ async fn test_datafusion_is_null_node_property() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); assert_eq!(result.num_columns(), 1); @@ -3272,7 +3510,10 @@ async fn test_datafusion_is_not_null_node_property() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 4); assert_eq!(result.num_columns(), 1); @@ -3311,7 +3552,10 @@ async fn test_datafusion_is_null_relationship_property() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 1); assert_eq!(result.num_columns(), 2); @@ -3349,7 +3593,10 @@ async fn test_datafusion_is_not_null_relationship_property() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let result = query.execute_datafusion(datasets).await.unwrap(); + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(result.num_rows(), 4); diff --git a/rust/lance-graph/tests/test_datafusion_scenarios.rs b/rust/lance-graph/tests/test_datafusion_scenarios.rs index 41d7d8db..287054fb 100644 --- a/rust/lance-graph/tests/test_datafusion_scenarios.rs +++ b/rust/lance-graph/tests/test_datafusion_scenarios.rs @@ -1,7 +1,7 @@ use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance_graph::config::GraphConfig; -use lance_graph::query::CypherQuery; +use lance_graph::{CypherQuery, ExecutionStrategy}; use std::collections::HashMap; use std::sync::Arc; @@ -14,7 +14,7 @@ async fn execute_query(graph: TestGraph, cypher: &str) -> RecordBatch { CypherQuery::new(cypher) .unwrap() .with_config(graph.config) - .execute_datafusion(graph.datasets) + .execute(graph.datasets, Some(ExecutionStrategy::DataFusion)) .await .unwrap() } diff --git a/rust/lance-graph/tests/test_datafusion_varlength_complex.rs b/rust/lance-graph/tests/test_datafusion_varlength_complex.rs index 79093e94..46c220a9 100644 --- a/rust/lance-graph/tests/test_datafusion_varlength_complex.rs +++ b/rust/lance-graph/tests/test_datafusion_varlength_complex.rs @@ -1,7 +1,7 @@ use arrow_array::{Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance_graph::config::GraphConfig; -use lance_graph::query::CypherQuery; +use lance_graph::{CypherQuery, ExecutionStrategy}; use std::collections::HashMap; use std::sync::Arc; @@ -161,7 +161,10 @@ async fn test_varlength_multiple_paths_to_target() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should find multiple paths to Jack assert!(out.num_rows() > 0, "Should find at least one path to Jack"); @@ -186,7 +189,10 @@ async fn test_varlength_shortest_path_length() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should find 3-hop paths assert!(out.num_rows() >= 1, "Should find at least one 3-hop path"); @@ -211,7 +217,10 @@ async fn test_varlength_with_cycle() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let names = out .column(0) @@ -249,7 +258,10 @@ async fn test_varlength_reachability_analysis() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Alice can reach many people within 3 hops assert!( @@ -277,7 +289,10 @@ async fn test_varlength_diamond_pattern() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should find multiple 2-hop paths to Diana // Alice->Bob->Diana, Alice->Charlie->Diana, plus potentially others @@ -306,7 +321,7 @@ async fn test_varlength_with_and_without_distinct() { datasets1.insert("Person".to_string(), person_batch.clone()); datasets1.insert("KNOWS".to_string(), knows_batch.clone()); - let out_all = query_all_paths.execute_datafusion(datasets1).await.unwrap(); + let out_all = query_all_paths.execute(datasets1, None).await.unwrap(); // Query WITH DISTINCT - returns unique endpoints only let query_distinct = CypherQuery::new( @@ -320,7 +335,7 @@ async fn test_varlength_with_and_without_distinct() { datasets2.insert("Person".to_string(), person_batch); datasets2.insert("KNOWS".to_string(), knows_batch); - let out_distinct = query_distinct.execute_datafusion(datasets2).await.unwrap(); + let out_distinct = query_distinct.execute(datasets2, None).await.unwrap(); // Note: Due to how variable-length paths are implemented with UNION, // DISTINCT may not fully deduplicate across all branches if intermediate @@ -360,7 +375,10 @@ async fn test_varlength_distinct_reduces_duplicates() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should find multiple people reachable in 2 hops assert!( @@ -405,7 +423,7 @@ async fn test_varlength_count_paths_vs_endpoints() { datasets1.insert("Person".to_string(), person_batch.clone()); datasets1.insert("KNOWS".to_string(), knows_batch.clone()); - let out_paths = query_paths.execute_datafusion(datasets1).await.unwrap(); + let out_paths = query_paths.execute(datasets1, None).await.unwrap(); // Count unique endpoints (with DISTINCT) let query_endpoints = CypherQuery::new( @@ -419,7 +437,7 @@ async fn test_varlength_count_paths_vs_endpoints() { datasets2.insert("Person".to_string(), person_batch); datasets2.insert("KNOWS".to_string(), knows_batch); - let out_endpoints = query_endpoints.execute_datafusion(datasets2).await.unwrap(); + let out_endpoints = query_endpoints.execute(datasets2, None).await.unwrap(); // Total paths should be >= unique endpoints assert!( @@ -458,7 +476,10 @@ async fn test_varlength_same_department() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let names = out .column(0) @@ -495,7 +516,10 @@ async fn test_varlength_cross_department_connections() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should find Marketing people reachable from Engineering assert!( @@ -524,7 +548,10 @@ async fn test_varlength_age_filter() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let ages = out.column(1).as_any().downcast_ref::().unwrap(); @@ -554,7 +581,10 @@ async fn test_varlength_age_range() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let ages = out.column(1).as_any().downcast_ref::().unwrap(); @@ -584,7 +614,10 @@ async fn test_varlength_convergence_to_hub() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Multiple people should reach Jack in 2 hops assert!( @@ -612,7 +645,10 @@ async fn test_varlength_divergence_from_source() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Bob knows multiple people directly assert!( @@ -645,7 +681,10 @@ async fn test_varlength_increasing_reach() { datasets.insert("Person".to_string(), person_batch.clone()); datasets.insert("KNOWS".to_string(), knows_batch.clone()); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let current_count = out.num_rows(); // Each additional hop should reach at least as many people (monotonic increase) @@ -680,7 +719,10 @@ async fn test_varlength_combined_filters() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); let ages = out.column(1).as_any().downcast_ref::().unwrap(); let departments = out @@ -716,7 +758,10 @@ async fn test_varlength_with_limit_and_order() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); assert_eq!(out.num_rows(), 3, "Should return exactly 3 results"); @@ -749,7 +794,10 @@ async fn test_varlength_large_hop_count() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Due to cycles, Alice can reach many people with 10 hops assert!(out.num_rows() >= 5, "Should reach many people with 10 hops"); @@ -775,7 +823,10 @@ async fn test_varlength_all_pairs_reachability() { datasets.insert("Person".to_string(), person_batch); datasets.insert("KNOWS".to_string(), knows_batch); - let out = query.execute_datafusion(datasets).await.unwrap(); + let out = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); // Should find many connected pairs assert!( diff --git a/rust/lance-graph/tests/test_datafusion_with_context.rs b/rust/lance-graph/tests/test_datafusion_with_context.rs index a9516016..0e234c74 100644 --- a/rust/lance-graph/tests/test_datafusion_with_context.rs +++ b/rust/lance-graph/tests/test_datafusion_with_context.rs @@ -63,10 +63,7 @@ async fn test_execute_with_context_csv_simple() { .unwrap() .with_config(config.clone()); - let result1 = query1 - .execute_with_datafusion_context(ctx.clone()) - .await - .unwrap(); + let result1 = query1.execute_with_context(ctx.clone()).await.unwrap(); // Should return Bob (34) and David (42) assert_eq!(result1.num_rows(), 2); @@ -102,7 +99,7 @@ async fn test_execute_with_context_csv_simple() { .unwrap() .with_config(config); - let result2 = query2.execute_with_datafusion_context(ctx).await.unwrap(); + let result2 = query2.execute_with_context(ctx).await.unwrap(); // Should return 3 relationships: Alice->Bob, Alice->Carol, Bob->Carol assert_eq!(result2.num_rows(), 3); @@ -226,7 +223,7 @@ async fn test_execute_with_context_complex_query() { .unwrap() .with_config(config); - let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + let result = query.execute_with_context(ctx).await.unwrap(); // Should return David (95000) and Bob (85000) from Engineering assert_eq!(result.num_rows(), 2); @@ -278,7 +275,7 @@ async fn test_execute_with_context_missing_table() { .unwrap() .with_config(config); - let result = query.execute_with_datafusion_context(ctx).await; + let result = query.execute_with_context(ctx).await; // Should error because Person table is not registered assert!(result.is_err()); @@ -324,7 +321,7 @@ async fn test_execute_with_context_aliases() { .unwrap() .with_config(config); - let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + let result = query.execute_with_context(ctx).await.unwrap(); assert_eq!(result.num_rows(), 2); diff --git a/rust/lance-graph/tests/test_explain_output.rs b/rust/lance-graph/tests/test_explain_output.rs index a112927d..27d9bfca 100644 --- a/rust/lance-graph/tests/test_explain_output.rs +++ b/rust/lance-graph/tests/test_explain_output.rs @@ -103,7 +103,7 @@ async fn test_explain_simple_node_scan() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), create_person_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("Cypher Query:")); @@ -134,7 +134,7 @@ async fn test_explain_one_hop_relationship() { datasets.insert("Person".to_string(), create_person_dataset()); datasets.insert("KNOWS".to_string(), create_knows_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("| graph_logical_plan")); @@ -166,7 +166,7 @@ async fn test_explain_two_hop_path() { datasets.insert("Person".to_string(), create_person_dataset()); datasets.insert("KNOWS".to_string(), create_knows_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("| physical_plan")); @@ -204,7 +204,7 @@ async fn test_explain_multi_relationship_types() { datasets.insert("Company".to_string(), create_company_dataset()); datasets.insert("WORKS_AT".to_string(), create_works_at_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("Company")); @@ -232,7 +232,7 @@ async fn test_explain_distinct() { datasets.insert("Person".to_string(), create_person_dataset()); datasets.insert("KNOWS".to_string(), create_knows_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("Distinct") || plan.contains("DISTINCT")); @@ -258,7 +258,7 @@ async fn test_explain_complex_filter() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), create_person_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("Filter")); @@ -287,7 +287,7 @@ async fn test_explain_with_skip_and_limit() { let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), create_person_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("Limit") || plan.contains("LIMIT")); @@ -316,7 +316,7 @@ async fn test_explain_relationship_properties() { datasets.insert("Person".to_string(), create_person_dataset()); datasets.insert("KNOWS".to_string(), create_knows_dataset()); - let plan = query.explain_datafusion(datasets).await.unwrap(); + let plan = query.explain(datasets).await.unwrap(); println!("{}", plan); assert!(plan.contains("since") || plan.contains("Filter"));