diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs index 832110e11131c..5b1a5478b227c 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs @@ -16,6 +16,7 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; +use crate::logical_plan::consumer::SubstraitHints; use crate::logical_plan::consumer::from_substrait_literal; use crate::logical_plan::consumer::from_substrait_named_struct; use crate::logical_plan::consumer::utils::ensure_schema_compatibility; @@ -40,12 +41,46 @@ pub async fn from_read_rel( consumer: &impl SubstraitConsumer, read: &ReadRel, ) -> datafusion::common::Result { + // proto3 defaults numeric fields to 0, so treat 0.0/negative/non-finite + // and values that would overflow usize as "not provided". + // Zero-row tables will lose their stats — accepted proto3 limitation. + let hints = { + let stats = read + .common + .as_ref() + .and_then(|c| c.hint.as_ref()) + .and_then(|h| h.stats.as_ref()); + let max_hint: f64 = usize::MAX as f64; + let row_count = stats.and_then(|s| { + if s.row_count > 0.0 && s.row_count.is_finite() && s.row_count < max_hint { + Some(s.row_count) + } else { + None + } + }); + let record_size = stats.and_then(|s| { + if s.record_size > 0.0 + && s.record_size.is_finite() + && s.record_size < max_hint + { + Some(s.record_size) + } else { + None + } + }); + SubstraitHints { + row_count, + record_size, + } + }; + async fn read_with_schema( consumer: &impl SubstraitConsumer, table_ref: TableReference, schema: DFSchema, projection: &Option, filter: &Option>, + hints: SubstraitHints, ) -> datafusion::common::Result { let schema = schema.replace_qualifier(table_ref.clone()); @@ -57,14 +92,14 @@ pub async fn from_read_rel( }; let plan = { - let provider = match consumer.resolve_table_ref(&table_ref).await? { - Some(ref provider) => Arc::clone(provider), + let provider = match consumer.resolve_table_ref(&table_ref, hints).await? { + Some(provider) => provider, _ => return plan_err!("No table named '{table_ref}'"), }; LogicalPlanBuilder::scan_with_filters( table_ref, - provider_as_source(Arc::clone(&provider)), + provider_as_source(provider), None, filters, )? @@ -110,6 +145,7 @@ pub async fn from_read_rel( substrait_schema, &read.projection, &read.filter, + hints, ) .await } @@ -213,6 +249,7 @@ pub async fn from_read_rel( substrait_schema, &read.projection, &read.filter, + hints, ) .await } diff --git a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs index 93f0a494d888c..314f80eeb65ab 100644 --- a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs @@ -24,13 +24,16 @@ use super::{ }; use crate::extensions::Extensions; use async_trait::async_trait; -use datafusion::arrow::datatypes::DataType; -use datafusion::catalog::TableProvider; +use datafusion::arrow::datatypes::{DataType, SchemaRef}; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::stats::Precision; use datafusion::common::{ - DFSchema, ScalarValue, TableReference, not_impl_err, substrait_err, + DFSchema, ScalarValue, Statistics, TableReference, not_impl_err, substrait_err, }; use datafusion::execution::{FunctionRegistry, SessionState}; +use datafusion::logical_expr::TableType; use datafusion::logical_expr::{Expr, Extension, LogicalPlan}; +use datafusion::physical_plan::ExecutionPlan; use std::sync::{Arc, RwLock}; use substrait::proto; use substrait::proto::expression as substrait_expression; @@ -44,6 +47,26 @@ use substrait::proto::{ FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, r#type, }; +/// Advisory hints extracted from a Substrait `RelCommon.hint.stats` message, +/// passed to [`SubstraitConsumer::resolve_table_ref`] so that implementors can +/// incorporate them into the returned [`TableProvider`]. +/// +/// The struct is `#[non_exhaustive]` so that new fields can be added in future +/// versions without breaking existing implementations. +#[non_exhaustive] +#[derive(Debug, Clone, Default)] +pub struct SubstraitHints { + /// Estimated number of rows, from `hint.stats.row_count`. + /// + /// `None` means the hint was absent or could not be reliably interpreted + /// (e.g. proto3 default-zero or a non-finite value). + pub row_count: Option, + /// Estimated average byte size per record, from `hint.stats.record_size`. + /// + /// `None` means the hint was absent or non-positive / non-finite. + pub record_size: Option, +} + #[async_trait] /// This trait is used to consume Substrait plans, converting them into DataFusion Logical Plans. /// It can be implemented by users to allow for custom handling of relations, expressions, etc. @@ -67,7 +90,7 @@ use substrait::proto::{ /// # use datafusion::logical_expr::expr::ScalarFunction; /// # use datafusion_substrait::extensions::Extensions; /// # use datafusion_substrait::logical_plan::consumer::{ -/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer +/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer, SubstraitHints /// # }; /// /// struct CustomSubstraitConsumer { @@ -80,6 +103,7 @@ use substrait::proto::{ /// async fn resolve_table_ref( /// &self, /// table_ref: &TableReference, +/// _hints: SubstraitHints, /// ) -> Result>> { /// let table = table_ref.table().to_string(); /// let schema = self.state.schema_for_ref(table_ref.clone())?; @@ -162,6 +186,7 @@ pub trait SubstraitConsumer: Send + Sync + Sized { async fn resolve_table_ref( &self, table_ref: &TableReference, + hints: SubstraitHints, ) -> datafusion::common::Result>>; // TODO: Remove these two methods @@ -471,6 +496,94 @@ pub trait SubstraitConsumer: Send + Sync + Sized { } } +/// Wraps an inner [`TableProvider`] and overrides its `statistics()` return value. +/// +/// Used by [`DefaultSubstraitConsumer`] to inject a row-count hint carried in a +/// Substrait `RelCommon.hint.stats` when the resolved provider has no statistics. +/// +/// # Note on `as_any()` behaviour +/// +/// `as_any()` intentionally delegates to the inner provider so that callers can +/// still downcast to the concrete inner type (e.g. `MemTable`) through this +/// wrapper. As a consequence, downcasting to `StatisticsOverrideTableProvider` +/// itself via `as_any()` will not work — but since this struct is private, +/// external code should never need to do that. +#[derive(Debug)] +struct StatisticsOverrideTableProvider { + inner: Arc, + statistics: Statistics, +} + +#[async_trait] +impl TableProvider for StatisticsOverrideTableProvider { + fn as_any(&self) -> &dyn std::any::Any { + // Delegate to the inner provider so that downcasting to the concrete + // inner type works transparently through this wrapper. + self.inner.as_any() + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn constraints(&self) -> Option<&datafusion::common::Constraints> { + self.inner.constraints() + } + + fn table_type(&self) -> TableType { + self.inner.table_type() + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::common::Result< + Vec, + > { + self.inner.supports_filters_pushdown(filters) + } + + fn statistics(&self) -> Option { + Some(self.statistics.clone()) + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::common::Result> { + self.inner.scan(state, projection, filters, limit).await + } + + async fn insert_into( + &self, + state: &dyn Session, + input: Arc, + insert_op: datafusion::logical_expr::dml::InsertOp, + ) -> datafusion::common::Result> { + self.inner.insert_into(state, input, insert_op).await + } + + async fn delete_from( + &self, + state: &dyn Session, + filters: Vec, + ) -> datafusion::common::Result> { + self.inner.delete_from(state, filters).await + } + + async fn update( + &self, + state: &dyn Session, + assignments: Vec<(String, Expr)>, + filters: Vec, + ) -> datafusion::common::Result> { + self.inner.update(state, assignments, filters).await + } +} + /// Default SubstraitConsumer for converting standard Substrait without user-defined extensions. /// /// Used as the consumer in [crate::logical_plan::consumer::from_substrait_plan] @@ -495,11 +608,72 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> { async fn resolve_table_ref( &self, table_ref: &TableReference, + hints: SubstraitHints, ) -> datafusion::common::Result>> { let table = table_ref.table().to_string(); let schema = self.state.schema_for_ref(table_ref.clone())?; - let table_provider = schema.table(&table).await?; - Ok(table_provider) + let provider = schema.table(&table).await?; + // Wrap the provider to inject hint statistics only for fields the + // provider doesn't already have (checked individually, not as a whole). + let has_hints = hints.row_count.is_some() || hints.record_size.is_some(); + let provider = match provider { + Some(provider) if has_hints => { + let existing = provider.statistics(); + let row_count_absent = existing + .as_ref() + .is_none_or(|s| matches!(s.num_rows, Precision::Absent)); + let byte_size_absent = existing + .as_ref() + .is_none_or(|s| matches!(s.total_byte_size, Precision::Absent)); + let inject_row_count = hints.row_count.is_some() && row_count_absent; + // Both hints required: total_byte_size = row_count * record_size. + let inject_byte_size = hints.row_count.is_some() + && hints.record_size.is_some() + && byte_size_absent; + if inject_row_count || inject_byte_size { + let num_rows = if inject_row_count { + Precision::Inexact(hints.row_count.unwrap().round() as usize) + } else { + existing.as_ref().map_or(Precision::Absent, |s| s.num_rows) + }; + let total_byte_size = if inject_byte_size { + // Prefer the provider's own row count for consistency. + let effective_rows = match &num_rows { + Precision::Exact(n) | Precision::Inexact(n) => *n as f64, + Precision::Absent => hints.row_count.unwrap(), + }; + let byte_size = effective_rows * hints.record_size.unwrap(); + // The product of two sub-usize::MAX values can still overflow. + if byte_size.is_finite() && byte_size < usize::MAX as f64 { + Precision::Inexact(byte_size.round() as usize) + } else { + Precision::Absent + } + } else { + existing + .as_ref() + .map_or(Precision::Absent, |s| s.total_byte_size) + }; + let column_statistics = + existing.map(|s| s.column_statistics).unwrap_or_else(|| { + Statistics::unknown_column(&provider.schema()) + }); + let statistics = Statistics { + num_rows, + total_byte_size, + column_statistics, + }; + Some(Arc::new(StatisticsOverrideTableProvider { + inner: provider, + statistics, + }) as Arc) + } else { + Some(provider) + } + } + provider => provider, + }; + Ok(provider) } fn get_extensions(&self) -> &Extensions { diff --git a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs index 8dfbb36d3767d..12065b361f1bc 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs @@ -18,7 +18,9 @@ use crate::logical_plan::producer::{ SubstraitProducer, to_substrait_literal, to_substrait_named_struct, }; +use datafusion::common::stats::Precision; use datafusion::common::{DFSchema, ToDFSchema, substrait_datafusion_err}; +use datafusion::datasource::source_as_provider; use datafusion::logical_expr::utils::conjunction; use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values}; use datafusion::scalar::ScalarValue; @@ -29,7 +31,7 @@ use substrait::proto::expression::mask_expression::{StructItem, StructSelect}; use substrait::proto::expression::nested::Struct as NestedStruct; use substrait::proto::read_rel::{NamedTable, ReadType, VirtualTable}; use substrait::proto::rel::RelType; -use substrait::proto::{ReadRel, Rel}; +use substrait::proto::{ReadRel, Rel, RelCommon, rel_common}; /// Converts rows of literal expressions into Substrait literal structs. /// @@ -131,9 +133,46 @@ pub fn from_table_scan( Some(Box::new(filter_expr)) }; + // Statistics are serialised as f64 hints. Exact becomes Inexact after a + // round-trip since RelCommon.hint.stats is advisory. Zero-row tables lose + // their stats due to proto3 default-zero — both limitations are accepted. + let common = source_as_provider(&scan.source) + .ok() + .and_then(|provider| provider.statistics()) + .and_then(|stats| { + let row_count = match stats.num_rows { + Precision::Exact(n) | Precision::Inexact(n) => Some(n as f64), + Precision::Absent => None, + }; + // record_size = total_byte_size / num_rows (both must be present) + let record_size = match (stats.total_byte_size, row_count) { + (Precision::Exact(b) | Precision::Inexact(b), Some(rows)) + if rows > 0.0 => + { + Some(b as f64 / rows) + } + _ => None, + }; + if row_count.is_none() && record_size.is_none() { + return None; + } + Some(RelCommon { + emit_kind: None, + hint: Some(rel_common::Hint { + stats: Some(rel_common::hint::Stats { + row_count: row_count.unwrap_or(0.0), + record_size: record_size.unwrap_or(0.0), + ..Default::default() + }), + ..Default::default() + }), + advanced_extension: None, + }) + }); + Ok(Box::new(Rel { rel_type: Some(RelType::Read(Box::new(ReadRel { - common: None, + common, base_schema: Some(base_schema), filter: filter_option, best_effort_filter: None, diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 5dd4aa4e2be91..5997809d515dc 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -17,13 +17,20 @@ use crate::utils::test::read_json; use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::Statistics; +use datafusion::common::stats::Precision; +use datafusion::datasource::source_as_provider; use datafusion::functions_nested::map::map; use datafusion::logical_expr::LogicalPlanBuilder; -use datafusion::physical_plan::Accumulator; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::{Accumulator, ExecutionPlan}; use datafusion::scalar::ScalarValue; use datafusion_substrait::logical_plan::{ consumer::from_substrait_plan, producer::to_substrait_plan, }; +use std::any::Any; use std::cmp::Ordering; use std::mem::size_of_val; @@ -37,7 +44,7 @@ use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::expr::{Exists, SetComparison, SetQuantifier}; use datafusion::logical_expr::{ EmptyRelation, Extension, InvariantLevel, LogicalPlan, Operator, PartitionEvaluator, - Repartition, Subquery, UserDefinedLogicalNode, Values, Volatility, + Repartition, Subquery, TableScan, UserDefinedLogicalNode, Values, Volatility, }; use datafusion::optimizer::simplify_expressions::expr_simplifier::THRESHOLD_INLINE_INLIST; use datafusion::prelude::*; @@ -46,7 +53,195 @@ use std::hash::Hash; use std::sync::Arc; use substrait::proto::extensions::simple_extension_declaration::MappingType; use substrait::proto::rel::RelType; -use substrait::proto::{Plan, Rel, plan_rel}; +use substrait::proto::{Plan, ReadRel, Rel, plan_rel}; + +/// A minimal [`TableProvider`] that exposes configurable statistics. +#[derive(Debug)] +struct TableWithStatistics { + schema: Arc, + row_count: usize, + total_byte_size: Option, +} + +#[async_trait::async_trait] +impl TableProvider for TableWithStatistics { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn statistics(&self) -> Option { + let total_byte_size = match self.total_byte_size { + Some(b) => Precision::Exact(b), + None => Precision::Absent, + }; + Some(Statistics { + num_rows: Precision::Exact(self.row_count), + total_byte_size, + column_statistics: Statistics::unknown_column(&self.schema), + }) + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + use datafusion::physical_plan::empty::EmptyExec; + let projected_schema: SchemaRef = match projection { + Some(indices) => Arc::new(Schema::new( + indices + .iter() + .map(|i| self.schema.field(*i).clone()) + .collect::>(), + )), + None => Arc::clone(&self.schema), + }; + Ok(Arc::new(EmptyExec::new(projected_schema))) + } +} + +/// A [`TableProvider`] that returns `Some(Statistics)` with all fields +/// `Absent` — distinct from `None` — to verify hint injection for providers +/// that report statistics but have no useful values. +#[derive(Debug)] +struct TableWithAbsentStatistics { + schema: Arc, +} + +#[async_trait::async_trait] +impl TableProvider for TableWithAbsentStatistics { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn statistics(&self) -> Option { + Some(Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: Statistics::unknown_column(&self.schema), + }) + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + use datafusion::physical_plan::empty::EmptyExec; + let projected_schema: SchemaRef = match projection { + Some(indices) => Arc::new(Schema::new( + indices + .iter() + .map(|i| self.schema.field(*i).clone()) + .collect::>(), + )), + None => Arc::clone(&self.schema), + }; + Ok(Arc::new(EmptyExec::new(projected_schema))) + } +} + +/// Navigates a Substrait [`Plan`] to the first [`ReadRel`], unwrapping an +/// optional top-level `Project` node on the way. +fn extract_read_rel(proto: &Plan) -> &ReadRel { + let root_rel = proto + .relations + .first() + .expect("plan has a relation") + .rel_type + .as_ref() + .expect("relation type is set"); + let input_rel = match root_rel { + plan_rel::RelType::Root(root) => root.input.as_ref().expect("root has input"), + plan_rel::RelType::Rel(rel) => rel, + }; + match input_rel.rel_type.as_ref().expect("rel_type is set") { + RelType::Read(r) => r.as_ref(), + RelType::Project(p) => match p + .input + .as_ref() + .expect("project has input") + .rel_type + .as_ref() + .expect("rel_type is set") + { + RelType::Read(r) => r.as_ref(), + other => panic!("expected Read inside Project, got {other:?}"), + }, + other => panic!("expected Read or Project at root, got {other:?}"), + } +} + +/// Extracts a [`TableScan`] from a [`LogicalPlan`], unwrapping an optional +/// top-level `Projection` node on the way. +fn extract_table_scan(plan: &LogicalPlan) -> &TableScan { + match plan { + LogicalPlan::TableScan(scan) => scan, + LogicalPlan::Projection(proj) => match proj.input.as_ref() { + LogicalPlan::TableScan(scan) => scan, + other => panic!("expected TableScan inside Projection, got {other:?}"), + }, + other => panic!("expected TableScan or Projection, got {other:?}"), + } +} + +fn test_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])) +} + +/// Serializes `SELECT a FROM stats_table` to Substrait using a provider +/// with the given statistics. +async fn make_stats_plan( + row_count: usize, + total_byte_size: Option, +) -> Result> { + let provider = Arc::new(TableWithStatistics { + schema: test_schema(), + row_count, + total_byte_size, + }); + let ctx = SessionContext::new(); + ctx.register_table("stats_table", provider)?; + let plan = ctx + .sql("SELECT a FROM stats_table") + .await? + .into_optimized_plan()?; + to_substrait_plan(&plan, &ctx.state()) +} + +/// Deserializes `proto` against `consumer_provider` as `stats_table` and +/// returns the statistics visible on the resolved provider after hint injection. +async fn consumer_statistics( + proto: &Plan, + consumer_provider: Arc, +) -> Result> { + let ctx = SessionContext::new(); + ctx.register_table("stats_table", consumer_provider)?; + let plan = from_substrait_plan(&ctx.state(), proto).await?; + let table_scan = extract_table_scan(&plan); + let provider = source_as_provider(&table_scan.source)?; + Ok(provider.statistics()) +} #[derive(Debug)] struct MockSerializerRegistry; @@ -102,7 +297,7 @@ impl PartialOrd for MockUserDefinedLogicalPlan { } impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan { - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn Any { self } @@ -212,6 +407,162 @@ async fn roundtrip_empty_relation_no_rows() -> Result<()> { Ok(()) } +#[tokio::test] +async fn producer_sets_stats_hints() -> Result<()> { + // row_count only: record_size absent → proto3 default 0.0 + let proto = make_stats_plan(100, None).await?; + let stats = extract_read_rel(&proto) + .common + .as_ref() + .and_then(|c| c.hint.as_ref()) + .and_then(|h| h.stats.as_ref()) + .expect("hint.stats should be set"); + assert_eq!(stats.row_count, 100.0); + assert_eq!(stats.record_size, 0.0); + + // row_count + total_byte_size: record_size = bytes / rows + let proto = make_stats_plan(100, Some(1000)).await?; + let stats = extract_read_rel(&proto) + .common + .as_ref() + .and_then(|c| c.hint.as_ref()) + .and_then(|h| h.stats.as_ref()) + .expect("hint.stats should be set"); + assert_eq!(stats.row_count, 100.0); + assert_eq!(stats.record_size, 10.0); + + Ok(()) +} + +#[tokio::test] +async fn consumer_injects_row_count_hint() -> Result<()> { + use datafusion::datasource::MemTable; + let schema = test_schema(); + let proto = make_stats_plan(42, None).await?; + let batch = + datafusion::arrow::record_batch::RecordBatch::new_empty(Arc::clone(&schema)); + let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch]])?); + // Exact(42) from producer becomes Inexact after round-trip (hint.stats is advisory) + let stats = consumer_statistics(&proto, provider) + .await? + .expect("stats injected"); + assert_eq!(stats.num_rows, Precision::Inexact(42)); + Ok(()) +} + +#[tokio::test] +async fn consumer_preserves_provider_statistics_over_hint() -> Result<()> { + let proto = make_stats_plan(42, None).await?; + let provider = Arc::new(TableWithStatistics { + schema: test_schema(), + row_count: 999, + total_byte_size: None, + }); + let stats = consumer_statistics(&proto, provider) + .await? + .expect("stats present"); + assert_eq!(stats.num_rows, Precision::Exact(999)); + Ok(()) +} + +#[tokio::test] +async fn consumer_injects_record_size_hint() -> Result<()> { + use datafusion::datasource::MemTable; + let schema = test_schema(); + let proto = make_stats_plan(200, Some(2000)).await?; // record_size hint = 10.0 + let batch = + datafusion::arrow::record_batch::RecordBatch::new_empty(Arc::clone(&schema)); + let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch]])?); + let stats = consumer_statistics(&proto, provider) + .await? + .expect("stats injected"); + assert_eq!(stats.num_rows, Precision::Inexact(200)); + assert_eq!(stats.total_byte_size, Precision::Inexact(2000)); + Ok(()) +} + +#[tokio::test] +async fn consumer_injects_hint_into_absent_statistics() -> Result<()> { + let proto = make_stats_plan(77, None).await?; + let provider = Arc::new(TableWithAbsentStatistics { + schema: test_schema(), + }); + let stats = consumer_statistics(&proto, provider) + .await? + .expect("stats injected"); + assert_eq!(stats.num_rows, Precision::Inexact(77)); + Ok(()) +} + +#[tokio::test] +async fn consumer_injects_byte_size_using_provider_row_count() -> Result<()> { + // Provider has its own row count (200); byte size must use that, not the hint's (100). + let proto = make_stats_plan(100, Some(1000)).await?; // record_size hint = 10.0 + let provider = Arc::new(TableWithStatistics { + schema: test_schema(), + row_count: 200, + total_byte_size: None, + }); + let stats = consumer_statistics(&proto, provider) + .await? + .expect("stats present"); + assert_eq!(stats.num_rows, Precision::Exact(200)); + assert_eq!(stats.total_byte_size, Precision::Inexact(2000)); // 200 * 10.0 + Ok(()) +} + +/// Zeroes out `hint.stats.row_count` in the plan's ReadRel, making it appear +/// absent to the consumer (proto3 encodes unset numeric fields as 0). +fn clear_row_count_hint(proto: &mut Plan) { + fn visit(rel: &mut Rel) { + match rel.rel_type.as_mut() { + Some(RelType::Read(r)) => { + if let Some(s) = r + .common + .as_mut() + .and_then(|c| c.hint.as_mut()) + .and_then(|h| h.stats.as_mut()) + { + s.row_count = 0.0; + } + } + Some(RelType::Project(p)) => { + if let Some(input) = p.input.as_mut() { + visit(input); + } + } + _ => {} + } + } + for relation in &mut proto.relations { + if let Some(plan_rel::RelType::Root(root)) = relation.rel_type.as_mut() { + if let Some(input) = root.input.as_mut() { + visit(input); + } + } + } +} + +#[tokio::test] +async fn consumer_skips_byte_size_when_row_count_hint_absent() -> Result<()> { + // When the plan carries only record_size (no row_count), total_byte_size + // cannot be reconstructed and remains Absent even when the provider has + // its own num_rows. This can arise from non-DataFusion Substrait producers. + let mut proto = make_stats_plan(100, Some(1000)).await?; // record_size hint = 10.0 + clear_row_count_hint(&mut proto); // row_count = 0.0 → treated as absent by consumer + let provider = Arc::new(TableWithStatistics { + schema: test_schema(), + row_count: 50, + total_byte_size: None, + }); + let stats = consumer_statistics(&proto, provider) + .await? + .expect("stats present"); + assert_eq!(stats.num_rows, Precision::Exact(50)); + assert_eq!(stats.total_byte_size, Precision::Absent); + Ok(()) +} + #[tokio::test] async fn roundtrip_subquery_with_empty_relation() -> Result<()> { // Test EmptyRelation in the context of scalar subqueries. diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 77b4fb6f71a35..e77cad5113e37 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -122,3 +122,44 @@ stats.column_statistics[0].min_value = ...; let mut stats = Arc::unwrap_or_clone(plan.partition_statistics(None)?); stats.column_statistics[0].min_value = ...; ``` + +### `SubstraitConsumer::resolve_table_ref` gains a `hints` parameter + +`SubstraitConsumer::resolve_table_ref` now takes an additional `hints: SubstraitHints` argument. `SubstraitHints` carries advisory statistics extracted from `RelCommon.hint.stats` (currently `row_count` and `record_size`), allowing implementors to incorporate them into the returned `TableProvider`. + +**Who is affected:** + +Users who implement the `SubstraitConsumer` trait with a custom `resolve_table_ref`. + +**Migration guide:** + +Add the `hints` parameter to your implementation. If you do not need the hints, prefix it with `_` to suppress the unused-variable warning: + +**Before:** + +```rust,ignore +async fn resolve_table_ref( + &self, + table_ref: &TableReference, +) -> Result>> { + // ... +} +``` + +**After:** + +```rust,ignore +use datafusion_substrait::logical_plan::consumer::SubstraitHints; + +async fn resolve_table_ref( + &self, + table_ref: &TableReference, + _hints: SubstraitHints, +) -> Result>> { + // ... +} +``` + +`SubstraitHints` derives `Default` (all fields `None` — a no-op) and is `#[non_exhaustive]`, so new fields can be added in future versions without further breaking changes. + +**Known limitation:** Tables with exactly 0 rows will have their row-count (and record-size) hint silently dropped. This is a consequence of proto3 encoding all unset numeric fields as `0`, making it impossible to distinguish "zero rows" from "hint not provided". Affected statistics will appear as `Precision::Absent` rather than `Precision::Inexact(0)` after a Substrait round-trip.