diff --git a/rust/lance-graph/CONTRIBUTING.md b/rust/lance-graph/CONTRIBUTING.md new file mode 100644 index 00000000..30e03a7e --- /dev/null +++ b/rust/lance-graph/CONTRIBUTING.md @@ -0,0 +1,14 @@ +Contributing (Rust crate) +========================= + +Before pushing or opening a PR, please verify Rust code formatting matches CI: + +```bash +cargo fmt --manifest-path rust/lance-graph/Cargo.toml -- --check +``` + +If the check fails, format the code locally: + +```bash +cargo fmt --manifest-path rust/lance-graph/Cargo.toml +``` diff --git a/rust/lance-graph/src/query.rs b/rust/lance-graph/src/query.rs index c1ed99ce..404811df 100644 --- a/rust/lance-graph/src/query.rs +++ b/rust/lance-graph/src/query.rs @@ -7,9 +7,15 @@ use crate::ast::CypherQuery as CypherAST; use crate::config::GraphConfig; use crate::error::{GraphError, Result}; use crate::parser::parse_cypher_query; -use datafusion::logical_expr::JoinType; use std::collections::HashMap; +mod path_executor; +use self::path_executor::PathExecutor; +mod aliases; +mod clauses; +mod expr; +use crate::query::expr::{to_df_boolean_expr_with_vars, to_df_literal}; + /// A Cypher query that can be executed against Lance datasets #[derive(Debug, Clone)] pub struct CypherQuery { @@ -22,352 +28,6 @@ pub struct CypherQuery { /// Query parameters parameters: HashMap, } - -// Internal helper that plans and executes a single path by chaining joins. -struct PathExecutor<'a> { - ctx: &'a datafusion::prelude::SessionContext, - path: &'a crate::ast::PathPattern, - start_label: &'a str, - start_alias: String, - segs: Vec>, - node_maps: std::collections::HashMap, - rel_maps: std::collections::HashMap, -} - -#[derive(Clone)] -struct SegMeta<'a> { - rel_type: &'a str, - end_label: &'a str, - dir: crate::ast::RelationshipDirection, - rel_alias: String, - end_alias: String, -} - -impl<'a> PathExecutor<'a> { - fn new( - ctx: &'a datafusion::prelude::SessionContext, - cfg: &'a crate::config::GraphConfig, - path: &'a crate::ast::PathPattern, - ) -> Result { - use std::collections::{HashMap, HashSet}; - let mut used: HashSet = HashSet::new(); - let mut uniq = |desired: &str| -> String { - if used.insert(desired.to_string()) { - return desired.to_string(); - } - let mut i = 2usize; - loop { - let cand = format!("{}_{}", desired, i); - if used.insert(cand.clone()) { - break cand; - } - i += 1; - } - }; - - let start_label = path - .start_node - .labels - .first() - .map(|s| s.as_str()) - .ok_or_else(|| GraphError::PlanError { - message: "Start node must have a label".to_string(), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let start_alias = uniq( - &path - .start_node - .variable - .as_deref() - .unwrap_or(start_label) - .to_lowercase(), - ); - - let mut segs: Vec = Vec::with_capacity(path.segments.len()); - for seg in &path.segments { - let rel_type = seg - .relationship - .types - .first() - .map(|s| s.as_str()) - .ok_or_else(|| GraphError::PlanError { - message: "Relationship must have a type".to_string(), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let end_label = seg - .end_node - .labels - .first() - .map(|s| s.as_str()) - .ok_or_else(|| GraphError::PlanError { - message: "End node must have a label".to_string(), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let rel_alias = uniq( - &seg.relationship - .variable - .as_deref() - .unwrap_or(rel_type) - .to_lowercase(), - ); - let end_alias = uniq( - &seg.end_node - .variable - .as_deref() - .unwrap_or(end_label) - .to_lowercase(), - ); - segs.push(SegMeta { - rel_type, - end_label, - dir: seg.relationship.direction.clone(), - rel_alias, - end_alias, - }); - } - - let mut node_maps: HashMap = HashMap::new(); - let mut rel_maps: HashMap = HashMap::new(); - node_maps.insert( - start_alias.clone(), - cfg.get_node_mapping(start_label) - .ok_or_else(|| GraphError::PlanError { - message: format!("No node mapping for '{}'", start_label), - location: snafu::Location::new(file!(), line!(), column!()), - })?, - ); - for seg in &segs { - node_maps.insert( - seg.end_alias.clone(), - cfg.get_node_mapping(seg.end_label) - .ok_or_else(|| GraphError::PlanError { - message: format!("No node mapping for '{}'", seg.end_label), - location: snafu::Location::new(file!(), line!(), column!()), - })?, - ); - rel_maps.insert( - seg.rel_alias.clone(), - cfg.get_relationship_mapping(seg.rel_type).ok_or_else(|| { - GraphError::PlanError { - message: format!("No relationship mapping for '{}'", seg.rel_type), - location: snafu::Location::new(file!(), line!(), column!()), - } - })?, - ); - } - - Ok(Self { - ctx, - path, - start_label, - start_alias, - segs, - node_maps, - rel_maps, - }) - } - - async fn open_aliased( - &self, - table: &str, - alias: &str, - ) -> Result { - let df = self - .ctx - .table(table) - .await - .map_err(|e| GraphError::PlanError { - message: format!("Failed to read table '{}': {}", table, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let schema = df.schema(); - let proj: Vec = schema - .fields() - .iter() - .map(|f| { - datafusion::logical_expr::col(f.name()).alias(format!("{}__{}", alias, f.name())) - }) - .collect(); - df.alias(alias)? - .select(proj) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to alias/select '{}': {}", table, e), - location: snafu::Location::new(file!(), line!(), column!()), - }) - } - - async fn build_chain(&self) -> Result { - // Start node - let mut df = self - .open_aliased(self.start_label, &self.start_alias) - .await?; - // Inline property filters on start node - for (k, v) in &self.path.start_node.properties { - let expr = to_df_literal(v); - df = df - .filter(datafusion::logical_expr::Expr::BinaryExpr( - datafusion::logical_expr::BinaryExpr { - left: Box::new(datafusion::logical_expr::col(format!( - "{}__{}", - self.start_alias, k - ))), - op: datafusion::logical_expr::Operator::Eq, - right: Box::new(expr), - }, - )) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to apply filter: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - - // Chain joins for each hop - let mut current_node_alias = self.start_alias.as_str(); - for s in &self.segs { - let rel_df = self.open_aliased(s.rel_type, &s.rel_alias).await?; - let node_map = self.node_maps.get(current_node_alias).unwrap(); - let rel_map = self.rel_maps.get(&s.rel_alias).unwrap(); - let (left_key, right_key) = match s.dir { - crate::ast::RelationshipDirection::Outgoing - | crate::ast::RelationshipDirection::Undirected => ( - format!("{}__{}", current_node_alias, node_map.id_field), - format!("{}__{}", s.rel_alias, rel_map.source_id_field), - ), - crate::ast::RelationshipDirection::Incoming => ( - format!("{}__{}", current_node_alias, node_map.id_field), - format!("{}__{}", s.rel_alias, rel_map.target_id_field), - ), - }; - df = df - .join( - rel_df, - JoinType::Inner, - &[left_key.as_str()], - &[right_key.as_str()], - None, - ) - .map_err(|e| GraphError::PlanError { - message: format!("Join failed (node->rel): {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - let end_df = self.open_aliased(s.end_label, &s.end_alias).await?; - let (left_key2, right_key2) = match s.dir { - crate::ast::RelationshipDirection::Outgoing - | crate::ast::RelationshipDirection::Undirected => ( - format!("{}__{}", s.rel_alias, rel_map.target_id_field), - format!( - "{}__{}", - s.end_alias, - self.node_maps.get(&s.end_alias).unwrap().id_field - ), - ), - crate::ast::RelationshipDirection::Incoming => ( - format!("{}__{}", s.rel_alias, rel_map.source_id_field), - format!( - "{}__{}", - s.end_alias, - self.node_maps.get(&s.end_alias).unwrap().id_field - ), - ), - }; - df = df - .join( - end_df, - JoinType::Inner, - &[left_key2.as_str()], - &[right_key2.as_str()], - None, - ) - .map_err(|e| GraphError::PlanError { - message: format!("Join failed (rel->node): {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - current_node_alias = &s.end_alias; - } - - Ok(df) - } - - fn resolve_var_alias<'b>(&'b self, var: &str) -> Option<&'b str> { - if Some(var) == self.path.start_node.variable.as_deref() { - return Some(self.start_alias.as_str()); - } - for (i, seg) in self.path.segments.iter().enumerate() { - if Some(var) == seg.relationship.variable.as_deref() { - return Some(self.segs[i].rel_alias.as_str()); - } - if Some(var) == seg.end_node.variable.as_deref() { - return Some(self.segs[i].end_alias.as_str()); - } - } - None - } - - fn apply_where( - &self, - mut df: datafusion::dataframe::DataFrame, - ast: &crate::ast::CypherQuery, - ) -> Result { - if let Some(where_clause) = &ast.where_clause { - if let Some(expr) = - to_df_boolean_expr_with_vars(&where_clause.expression, &|var, prop| { - let alias = self.resolve_var_alias(var).unwrap_or(var); - format!("{}__{}", alias, prop) - }) - { - df = df.filter(expr).map_err(|e| GraphError::PlanError { - message: format!("Failed to apply WHERE: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - } - Ok(df) - } - - fn apply_return( - &self, - mut df: datafusion::dataframe::DataFrame, - ast: &crate::ast::CypherQuery, - ) -> Result { - use datafusion::logical_expr::Expr; - let mut proj: Vec = Vec::new(); - for item in &ast.return_clause.items { - if let crate::ast::ValueExpression::Property(prop) = &item.expression { - let alias = self - .resolve_var_alias(&prop.variable) - .unwrap_or(&prop.variable); - let mut e = datafusion::logical_expr::col(format!("{}__{}", alias, prop.property)); - if let Some(a) = &item.alias { - e = e.alias(a); - } - proj.push(e); - } - } - if !proj.is_empty() { - df = df.select(proj).map_err(|e| GraphError::PlanError { - message: format!("Failed to project: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - if ast.return_clause.distinct { - df = df.distinct().map_err(|e| GraphError::PlanError { - message: format!("Failed to apply DISTINCT: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - if let Some(limit) = ast.limit { - df = df - .limit(0, Some(limit as usize)) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to apply LIMIT: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - Ok(df) - } -} - impl CypherQuery { /// Create a new Cypher query from a query string pub fn new(query: &str) -> Result { @@ -1353,66 +1013,6 @@ impl CypherQuery { } } -fn to_df_boolean_expr_with_vars( - expr: &crate::ast::BooleanExpression, - qualify: &F, -) -> Option -where - F: Fn(&str, &str) -> String, -{ - use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO, ValueExpression as VE}; - use datafusion::logical_expr::{col, Expr, Operator}; - match expr { - BE::Comparison { - left, - operator, - right, - } => { - let (var, prop, lit_expr) = match (left, right) { - (VE::Property(p), VE::Literal(val)) => { - (p.variable.as_str(), p.property.as_str(), to_df_literal(val)) - } - (VE::Literal(val), VE::Property(p)) => { - (p.variable.as_str(), p.property.as_str(), to_df_literal(val)) - } - _ => return None, - }; - let qualified = qualify(var, prop); - let op = match operator { - CO::Equal => Operator::Eq, - CO::NotEqual => Operator::NotEq, - CO::LessThan => Operator::Lt, - CO::LessThanOrEqual => Operator::LtEq, - CO::GreaterThan => Operator::Gt, - CO::GreaterThanOrEqual => Operator::GtEq, - }; - Some(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr { - left: Box::new(col(&qualified)), - op, - right: Box::new(lit_expr), - })) - } - BE::And(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( - datafusion::logical_expr::BinaryExpr { - left: Box::new(to_df_boolean_expr_with_vars(l, qualify)?), - op: Operator::And, - right: Box::new(to_df_boolean_expr_with_vars(r, qualify)?), - }, - )), - BE::Or(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( - datafusion::logical_expr::BinaryExpr { - left: Box::new(to_df_boolean_expr_with_vars(l, qualify)?), - op: Operator::Or, - right: Box::new(to_df_boolean_expr_with_vars(r, qualify)?), - }, - )), - BE::Not(inner) => Some(datafusion::logical_expr::Expr::Not(Box::new( - to_df_boolean_expr_with_vars(inner, qualify)?, - ))), - _ => None, - } -} - /// Builder for constructing Cypher queries programmatically #[derive(Debug, Default)] pub struct CypherQueryBuilder { @@ -1596,26 +1196,11 @@ fn to_df_value_expr_simple(expr: &crate::ast::ValueExpression) -> datafusion::lo match expr { VE::Property(prop) => col(&prop.property), VE::Variable(v) => col(v), - VE::Literal(v) => to_df_literal(v), + VE::Literal(v) => crate::query::expr::to_df_literal(v), VE::Function { .. } | VE::Arithmetic { .. } => lit(0), } } -fn to_df_literal(val: &crate::ast::PropertyValue) -> datafusion::logical_expr::Expr { - use datafusion::logical_expr::lit; - match val { - crate::ast::PropertyValue::String(s) => lit(s.clone()), - crate::ast::PropertyValue::Integer(i) => lit(*i), - crate::ast::PropertyValue::Float(f) => lit(*f), - crate::ast::PropertyValue::Boolean(b) => lit(*b), - crate::ast::PropertyValue::Null => { - datafusion::logical_expr::Expr::Literal(datafusion::scalar::ScalarValue::Null, None) - } - crate::ast::PropertyValue::Parameter(_) => lit(0), - crate::ast::PropertyValue::Property(prop) => datafusion::logical_expr::col(&prop.property), - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance-graph/src/query/aliases.rs b/rust/lance-graph/src/query/aliases.rs new file mode 100644 index 00000000..785cea23 --- /dev/null +++ b/rust/lance-graph/src/query/aliases.rs @@ -0,0 +1,3 @@ +pub(super) fn qualify_alias_property(alias: &str, property: &str) -> String { + format!("{}__{}", alias, property) +} diff --git a/rust/lance-graph/src/query/clauses.rs b/rust/lance-graph/src/query/clauses.rs new file mode 100644 index 00000000..79d76953 --- /dev/null +++ b/rust/lance-graph/src/query/clauses.rs @@ -0,0 +1,62 @@ +use crate::error::Result; + +pub(super) fn apply_where_with_qualifier( + mut df: datafusion::dataframe::DataFrame, + ast: &crate::ast::CypherQuery, + qualify: &dyn Fn(&str, &str) -> String, +) -> Result { + use crate::error::GraphError; + use crate::query::expr::to_df_boolean_expr_with_vars; + if let Some(where_clause) = &ast.where_clause { + if let Some(expr) = + to_df_boolean_expr_with_vars(&where_clause.expression, &|v, p| qualify(v, p)) + { + df = df.filter(expr).map_err(|e| GraphError::PlanError { + message: format!("Failed to apply WHERE: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + } + } + Ok(df) +} + +pub(super) fn apply_return_with_qualifier( + mut df: datafusion::dataframe::DataFrame, + ast: &crate::ast::CypherQuery, + qualify: &dyn Fn(&str, &str) -> String, +) -> Result { + use crate::error::GraphError; + use datafusion::logical_expr::Expr; + let mut proj: Vec = Vec::new(); + for item in &ast.return_clause.items { + if let crate::ast::ValueExpression::Property(prop) = &item.expression { + let col_name = qualify(&prop.variable, &prop.property); + let mut e = datafusion::logical_expr::col(col_name); + if let Some(a) = &item.alias { + e = e.alias(a); + } + proj.push(e); + } + } + if !proj.is_empty() { + df = df.select(proj).map_err(|e| GraphError::PlanError { + message: format!("Failed to project: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + } + if ast.return_clause.distinct { + df = df.distinct().map_err(|e| GraphError::PlanError { + message: format!("Failed to apply DISTINCT: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + } + if let Some(limit) = ast.limit { + df = df + .limit(0, Some(limit as usize)) + .map_err(|e| GraphError::PlanError { + message: format!("Failed to apply LIMIT: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + } + Ok(df) +} diff --git a/rust/lance-graph/src/query/expr.rs b/rust/lance-graph/src/query/expr.rs new file mode 100644 index 00000000..d7b9f3f8 --- /dev/null +++ b/rust/lance-graph/src/query/expr.rs @@ -0,0 +1,74 @@ +pub(super) fn to_df_boolean_expr_with_vars( + expr: &crate::ast::BooleanExpression, + qualify: &F, +) -> Option +where + F: Fn(&str, &str) -> String, +{ + use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO, ValueExpression as VE}; + use datafusion::logical_expr::{col, Expr, Operator}; + match expr { + BE::Comparison { + left, + operator, + right, + } => { + let (var, prop, lit_expr) = match (left, right) { + (VE::Property(p), VE::Literal(val)) => { + (p.variable.as_str(), p.property.as_str(), to_df_literal(val)) + } + (VE::Literal(val), VE::Property(p)) => { + (p.variable.as_str(), p.property.as_str(), to_df_literal(val)) + } + _ => return None, + }; + let qualified = qualify(var, prop); + let op = match operator { + CO::Equal => Operator::Eq, + CO::NotEqual => Operator::NotEq, + CO::LessThan => Operator::Lt, + CO::LessThanOrEqual => Operator::LtEq, + CO::GreaterThan => Operator::Gt, + CO::GreaterThanOrEqual => Operator::GtEq, + }; + Some(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr { + left: Box::new(col(&qualified)), + op, + right: Box::new(lit_expr), + })) + } + BE::And(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( + datafusion::logical_expr::BinaryExpr { + left: Box::new(to_df_boolean_expr_with_vars(l, qualify)?), + op: Operator::And, + right: Box::new(to_df_boolean_expr_with_vars(r, qualify)?), + }, + )), + BE::Or(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( + datafusion::logical_expr::BinaryExpr { + left: Box::new(to_df_boolean_expr_with_vars(l, qualify)?), + op: Operator::Or, + right: Box::new(to_df_boolean_expr_with_vars(r, qualify)?), + }, + )), + BE::Not(inner) => Some(datafusion::logical_expr::Expr::Not(Box::new( + to_df_boolean_expr_with_vars(inner, qualify)?, + ))), + _ => None, + } +} + +pub(super) fn to_df_literal(val: &crate::ast::PropertyValue) -> datafusion::logical_expr::Expr { + use datafusion::logical_expr::lit; + match val { + crate::ast::PropertyValue::String(s) => lit(s.clone()), + crate::ast::PropertyValue::Integer(i) => lit(*i), + crate::ast::PropertyValue::Float(f) => lit(*f), + crate::ast::PropertyValue::Boolean(b) => lit(*b), + crate::ast::PropertyValue::Null => { + datafusion::logical_expr::Expr::Literal(datafusion::scalar::ScalarValue::Null, None) + } + crate::ast::PropertyValue::Parameter(_) => lit(0), + crate::ast::PropertyValue::Property(prop) => datafusion::logical_expr::col(&prop.property), + } +} diff --git a/rust/lance-graph/src/query/path_executor.rs b/rust/lance-graph/src/query/path_executor.rs new file mode 100644 index 00000000..21e30008 --- /dev/null +++ b/rust/lance-graph/src/query/path_executor.rs @@ -0,0 +1,306 @@ +use crate::error::{GraphError, Result}; +use datafusion::logical_expr::JoinType; + +// Internal helper that plans and executes a single path by chaining joins. +pub(super) struct PathExecutor<'a> { + pub(super) ctx: &'a datafusion::prelude::SessionContext, + pub(super) path: &'a crate::ast::PathPattern, + pub(super) start_label: &'a str, + pub(super) start_alias: String, + segs: Vec>, + node_maps: std::collections::HashMap, + rel_maps: std::collections::HashMap, +} + +#[derive(Clone)] +struct SegMeta<'a> { + rel_type: &'a str, + end_label: &'a str, + dir: crate::ast::RelationshipDirection, + rel_alias: String, + end_alias: String, +} + +impl<'a> PathExecutor<'a> { + pub(super) fn new( + ctx: &'a datafusion::prelude::SessionContext, + cfg: &'a crate::config::GraphConfig, + path: &'a crate::ast::PathPattern, + ) -> Result { + use std::collections::{HashMap, HashSet}; + let mut used: HashSet = HashSet::new(); + let mut uniq = |desired: &str| -> String { + if used.insert(desired.to_string()) { + return desired.to_string(); + } + let mut i = 2usize; + loop { + let cand = format!("{}_{}", desired, i); + if used.insert(cand.clone()) { + break cand; + } + i += 1; + } + }; + + let start_label = path + .start_node + .labels + .first() + .map(|s| s.as_str()) + .ok_or_else(|| GraphError::PlanError { + message: "Start node must have a label".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + let start_alias = uniq( + &path + .start_node + .variable + .as_deref() + .unwrap_or(start_label) + .to_lowercase(), + ); + + let mut segs: Vec = Vec::with_capacity(path.segments.len()); + for seg in &path.segments { + let rel_type = seg + .relationship + .types + .first() + .map(|s| s.as_str()) + .ok_or_else(|| GraphError::PlanError { + message: "Relationship must have a type".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + let end_label = seg + .end_node + .labels + .first() + .map(|s| s.as_str()) + .ok_or_else(|| GraphError::PlanError { + message: "End node must have a label".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + let rel_alias = uniq( + &seg.relationship + .variable + .as_deref() + .unwrap_or(rel_type) + .to_lowercase(), + ); + let end_alias = uniq( + &seg.end_node + .variable + .as_deref() + .unwrap_or(end_label) + .to_lowercase(), + ); + segs.push(SegMeta { + rel_type, + end_label, + dir: seg.relationship.direction.clone(), + rel_alias, + end_alias, + }); + } + + let mut node_maps: HashMap = HashMap::new(); + let mut rel_maps: HashMap = HashMap::new(); + node_maps.insert( + start_alias.clone(), + cfg.get_node_mapping(start_label) + .ok_or_else(|| GraphError::PlanError { + message: format!("No node mapping for '{}'", start_label), + location: snafu::Location::new(file!(), line!(), column!()), + })?, + ); + for seg in &segs { + node_maps.insert( + seg.end_alias.clone(), + cfg.get_node_mapping(seg.end_label) + .ok_or_else(|| GraphError::PlanError { + message: format!("No node mapping for '{}'", seg.end_label), + location: snafu::Location::new(file!(), line!(), column!()), + })?, + ); + rel_maps.insert( + seg.rel_alias.clone(), + cfg.get_relationship_mapping(seg.rel_type).ok_or_else(|| { + GraphError::PlanError { + message: format!("No relationship mapping for '{}'", seg.rel_type), + location: snafu::Location::new(file!(), line!(), column!()), + } + })?, + ); + } + + Ok(Self { + ctx, + path, + start_label, + start_alias, + segs, + node_maps, + rel_maps, + }) + } + + async fn open_aliased( + &self, + table: &str, + alias: &str, + ) -> Result { + let df = self + .ctx + .table(table) + .await + .map_err(|e| GraphError::PlanError { + message: format!("Failed to read table '{}': {}", table, e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + let schema = df.schema(); + let proj: Vec = schema + .fields() + .iter() + .map(|f| { + datafusion::logical_expr::col(f.name()).alias(format!("{}__{}", alias, f.name())) + }) + .collect(); + df.alias(alias)? + .select(proj) + .map_err(|e| GraphError::PlanError { + message: format!("Failed to alias/select '{}': {}", table, e), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + pub(super) async fn build_chain(&self) -> Result { + // Start node + let mut df = self + .open_aliased(self.start_label, &self.start_alias) + .await?; + // Inline property filters on start node + for (k, v) in &self.path.start_node.properties { + let expr = super::expr::to_df_literal(v); + df = df + .filter(datafusion::logical_expr::Expr::BinaryExpr( + datafusion::logical_expr::BinaryExpr { + left: Box::new(datafusion::logical_expr::col(format!( + "{}__{}", + self.start_alias, k + ))), + op: datafusion::logical_expr::Operator::Eq, + right: Box::new(expr), + }, + )) + .map_err(|e| GraphError::PlanError { + message: format!("Failed to apply filter: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + } + + // Chain joins for each hop + let mut current_node_alias = self.start_alias.as_str(); + for s in &self.segs { + let rel_df = self.open_aliased(s.rel_type, &s.rel_alias).await?; + let node_map = self.node_maps.get(current_node_alias).unwrap(); + let rel_map = self.rel_maps.get(&s.rel_alias).unwrap(); + let (left_key, right_key) = match s.dir { + crate::ast::RelationshipDirection::Outgoing + | crate::ast::RelationshipDirection::Undirected => ( + format!("{}__{}", current_node_alias, node_map.id_field), + format!("{}__{}", s.rel_alias, rel_map.source_id_field), + ), + crate::ast::RelationshipDirection::Incoming => ( + format!("{}__{}", current_node_alias, node_map.id_field), + format!("{}__{}", s.rel_alias, rel_map.target_id_field), + ), + }; + df = df + .join( + rel_df, + JoinType::Inner, + &[left_key.as_str()], + &[right_key.as_str()], + None, + ) + .map_err(|e| GraphError::PlanError { + message: format!("Join failed (node->rel): {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let end_df = self.open_aliased(s.end_label, &s.end_alias).await?; + let (left_key2, right_key2) = match s.dir { + crate::ast::RelationshipDirection::Outgoing + | crate::ast::RelationshipDirection::Undirected => ( + format!("{}__{}", s.rel_alias, rel_map.target_id_field), + format!( + "{}__{}", + s.end_alias, + self.node_maps.get(&s.end_alias).unwrap().id_field + ), + ), + crate::ast::RelationshipDirection::Incoming => ( + format!("{}__{}", s.rel_alias, rel_map.source_id_field), + format!( + "{}__{}", + s.end_alias, + self.node_maps.get(&s.end_alias).unwrap().id_field + ), + ), + }; + df = df + .join( + end_df, + JoinType::Inner, + &[left_key2.as_str()], + &[right_key2.as_str()], + None, + ) + .map_err(|e| GraphError::PlanError { + message: format!("Join failed (rel->node): {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + current_node_alias = &s.end_alias; + } + + Ok(df) + } + + fn resolve_var_alias<'b>(&'b self, var: &str) -> Option<&'b str> { + if Some(var) == self.path.start_node.variable.as_deref() { + return Some(self.start_alias.as_str()); + } + for (i, seg) in self.path.segments.iter().enumerate() { + if Some(var) == seg.relationship.variable.as_deref() { + return Some(self.segs[i].rel_alias.as_str()); + } + if Some(var) == seg.end_node.variable.as_deref() { + return Some(self.segs[i].end_alias.as_str()); + } + } + None + } + + pub(super) fn apply_where( + &self, + df: datafusion::dataframe::DataFrame, + ast: &crate::ast::CypherQuery, + ) -> Result { + super::clauses::apply_where_with_qualifier(df, ast, &|var, prop| { + let alias = self.resolve_var_alias(var).unwrap_or(var); + super::aliases::qualify_alias_property(alias, prop) + }) + } + + pub(super) fn apply_return( + &self, + df: datafusion::dataframe::DataFrame, + ast: &crate::ast::CypherQuery, + ) -> Result { + super::clauses::apply_return_with_qualifier(df, ast, &|var, prop| { + let alias = self.resolve_var_alias(var).unwrap_or(var); + super::aliases::qualify_alias_property(alias, prop) + }) + } +}