Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::logical_plan::consumer::SubstraitConsumer;
use crate::logical_plan::consumer::SubstraitHints;
use crate::logical_plan::consumer::from_substrait_literal;
use crate::logical_plan::consumer::from_substrait_named_struct;
use crate::logical_plan::consumer::utils::ensure_schema_compatibility;
Expand All @@ -40,12 +41,46 @@ pub async fn from_read_rel(
consumer: &impl SubstraitConsumer,
read: &ReadRel,
) -> datafusion::common::Result<LogicalPlan> {
// proto3 defaults numeric fields to 0, so treat 0.0/negative/non-finite
// and values that would overflow usize as "not provided".
// Zero-row tables will lose their stats — accepted proto3 limitation.
let hints = {
let stats = read
.common
.as_ref()
.and_then(|c| c.hint.as_ref())
.and_then(|h| h.stats.as_ref());
let max_hint: f64 = usize::MAX as f64;
let row_count = stats.and_then(|s| {
if s.row_count > 0.0 && s.row_count.is_finite() && s.row_count < max_hint {
Some(s.row_count)
} else {
None
}
});
let record_size = stats.and_then(|s| {
if s.record_size > 0.0
&& s.record_size.is_finite()
&& s.record_size < max_hint
{
Some(s.record_size)
} else {
None
}
});
SubstraitHints {
row_count,
record_size,
}
};

async fn read_with_schema(
consumer: &impl SubstraitConsumer,
table_ref: TableReference,
schema: DFSchema,
projection: &Option<MaskExpression>,
filter: &Option<Box<Expression>>,
hints: SubstraitHints,
) -> datafusion::common::Result<LogicalPlan> {
let schema = schema.replace_qualifier(table_ref.clone());

Expand All @@ -57,14 +92,14 @@ pub async fn from_read_rel(
};

let plan = {
let provider = match consumer.resolve_table_ref(&table_ref).await? {
Some(ref provider) => Arc::clone(provider),
let provider = match consumer.resolve_table_ref(&table_ref, hints).await? {
Some(provider) => provider,
_ => return plan_err!("No table named '{table_ref}'"),
};

LogicalPlanBuilder::scan_with_filters(
table_ref,
provider_as_source(Arc::clone(&provider)),
provider_as_source(provider),
None,
filters,
)?
Expand Down Expand Up @@ -110,6 +145,7 @@ pub async fn from_read_rel(
substrait_schema,
&read.projection,
&read.filter,
hints,
)
.await
}
Expand Down Expand Up @@ -213,6 +249,7 @@ pub async fn from_read_rel(
substrait_schema,
&read.projection,
&read.filter,
hints,
)
.await
}
Expand Down
186 changes: 180 additions & 6 deletions datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ use super::{
};
use crate::extensions::Extensions;
use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::catalog::TableProvider;
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::stats::Precision;
use datafusion::common::{
DFSchema, ScalarValue, TableReference, not_impl_err, substrait_err,
DFSchema, ScalarValue, Statistics, TableReference, not_impl_err, substrait_err,
};
use datafusion::execution::{FunctionRegistry, SessionState};
use datafusion::logical_expr::TableType;
use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
use datafusion::physical_plan::ExecutionPlan;
use std::sync::{Arc, RwLock};
use substrait::proto;
use substrait::proto::expression as substrait_expression;
Expand All @@ -44,6 +47,26 @@ use substrait::proto::{
FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, r#type,
};

/// Advisory hints extracted from a Substrait `RelCommon.hint.stats` message,
/// passed to [`SubstraitConsumer::resolve_table_ref`] so that implementors can
/// incorporate them into the returned [`TableProvider`].
///
/// The struct is `#[non_exhaustive]` so that new fields can be added in future
/// versions without breaking existing implementations.
#[non_exhaustive]
#[derive(Debug, Clone, Default)]
pub struct SubstraitHints {
/// Estimated number of rows, from `hint.stats.row_count`.
///
/// `None` means the hint was absent or could not be reliably interpreted
/// (e.g. proto3 default-zero or a non-finite value).
pub row_count: Option<f64>,
/// Estimated average byte size per record, from `hint.stats.record_size`.
///
/// `None` means the hint was absent or non-positive / non-finite.
pub record_size: Option<f64>,
}

#[async_trait]
/// This trait is used to consume Substrait plans, converting them into DataFusion Logical Plans.
/// It can be implemented by users to allow for custom handling of relations, expressions, etc.
Expand All @@ -67,7 +90,7 @@ use substrait::proto::{
/// # use datafusion::logical_expr::expr::ScalarFunction;
/// # use datafusion_substrait::extensions::Extensions;
/// # use datafusion_substrait::logical_plan::consumer::{
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer, SubstraitHints
/// # };
///
/// struct CustomSubstraitConsumer {
Expand All @@ -80,6 +103,7 @@ use substrait::proto::{
/// async fn resolve_table_ref(
/// &self,
/// table_ref: &TableReference,
/// _hints: SubstraitHints,
/// ) -> Result<Option<Arc<dyn TableProvider>>> {
/// let table = table_ref.table().to_string();
/// let schema = self.state.schema_for_ref(table_ref.clone())?;
Expand Down Expand Up @@ -162,6 +186,7 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
hints: SubstraitHints,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>>;

// TODO: Remove these two methods
Expand Down Expand Up @@ -471,6 +496,94 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
}
}

/// Wraps an inner [`TableProvider`] and overrides its `statistics()` return value.
///
/// Used by [`DefaultSubstraitConsumer`] to inject a row-count hint carried in a
/// Substrait `RelCommon.hint.stats` when the resolved provider has no statistics.
///
/// # Note on `as_any()` behaviour
///
/// `as_any()` intentionally delegates to the inner provider so that callers can
/// still downcast to the concrete inner type (e.g. `MemTable`) through this
/// wrapper. As a consequence, downcasting to `StatisticsOverrideTableProvider`
/// itself via `as_any()` will not work — but since this struct is private,
/// external code should never need to do that.
#[derive(Debug)]
struct StatisticsOverrideTableProvider {
inner: Arc<dyn TableProvider>,
statistics: Statistics,
}

#[async_trait]
impl TableProvider for StatisticsOverrideTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
// Delegate to the inner provider so that downcasting to the concrete
// inner type works transparently through this wrapper.
self.inner.as_any()
}

fn schema(&self) -> SchemaRef {
self.inner.schema()
}

fn constraints(&self) -> Option<&datafusion::common::Constraints> {
self.inner.constraints()
}

fn table_type(&self) -> TableType {
self.inner.table_type()
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::common::Result<
Vec<datafusion::logical_expr::TableProviderFilterPushDown>,
> {
self.inner.supports_filters_pushdown(filters)
}

fn statistics(&self) -> Option<Statistics> {
Some(self.statistics.clone())
}

async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
self.inner.scan(state, projection, filters, limit).await
}

async fn insert_into(
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
insert_op: datafusion::logical_expr::dml::InsertOp,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
self.inner.insert_into(state, input, insert_op).await
}

async fn delete_from(
&self,
state: &dyn Session,
filters: Vec<Expr>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
self.inner.delete_from(state, filters).await
}

async fn update(
&self,
state: &dyn Session,
assignments: Vec<(String, Expr)>,
filters: Vec<Expr>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
self.inner.update(state, assignments, filters).await
}
}

/// Default SubstraitConsumer for converting standard Substrait without user-defined extensions.
///
/// Used as the consumer in [crate::logical_plan::consumer::from_substrait_plan]
Expand All @@ -495,11 +608,72 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
hints: SubstraitHints,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
let table = table_ref.table().to_string();
let schema = self.state.schema_for_ref(table_ref.clone())?;
let table_provider = schema.table(&table).await?;
Ok(table_provider)
let provider = schema.table(&table).await?;
// Wrap the provider to inject hint statistics only for fields the
// provider doesn't already have (checked individually, not as a whole).
let has_hints = hints.row_count.is_some() || hints.record_size.is_some();
let provider = match provider {
Some(provider) if has_hints => {
let existing = provider.statistics();
let row_count_absent = existing
.as_ref()
.is_none_or(|s| matches!(s.num_rows, Precision::Absent));
let byte_size_absent = existing
.as_ref()
.is_none_or(|s| matches!(s.total_byte_size, Precision::Absent));
let inject_row_count = hints.row_count.is_some() && row_count_absent;
// Both hints required: total_byte_size = row_count * record_size.
let inject_byte_size = hints.row_count.is_some()
&& hints.record_size.is_some()
&& byte_size_absent;
if inject_row_count || inject_byte_size {
let num_rows = if inject_row_count {
Precision::Inexact(hints.row_count.unwrap().round() as usize)
} else {
existing.as_ref().map_or(Precision::Absent, |s| s.num_rows)
};
let total_byte_size = if inject_byte_size {
// Prefer the provider's own row count for consistency.
let effective_rows = match &num_rows {
Precision::Exact(n) | Precision::Inexact(n) => *n as f64,
Precision::Absent => hints.row_count.unwrap(),
};
let byte_size = effective_rows * hints.record_size.unwrap();
// The product of two sub-usize::MAX values can still overflow.
if byte_size.is_finite() && byte_size < usize::MAX as f64 {
Precision::Inexact(byte_size.round() as usize)
} else {
Precision::Absent
}
} else {
existing
.as_ref()
.map_or(Precision::Absent, |s| s.total_byte_size)
};
let column_statistics =
existing.map(|s| s.column_statistics).unwrap_or_else(|| {
Statistics::unknown_column(&provider.schema())
});
let statistics = Statistics {
num_rows,
total_byte_size,
column_statistics,
};
Some(Arc::new(StatisticsOverrideTableProvider {
inner: provider,
statistics,
}) as Arc<dyn TableProvider>)
} else {
Some(provider)
}
}
provider => provider,
};
Ok(provider)
}

fn get_extensions(&self) -> &Extensions {
Expand Down
43 changes: 41 additions & 2 deletions datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
use crate::logical_plan::producer::{
SubstraitProducer, to_substrait_literal, to_substrait_named_struct,
};
use datafusion::common::stats::Precision;
use datafusion::common::{DFSchema, ToDFSchema, substrait_datafusion_err};
use datafusion::datasource::source_as_provider;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values};
use datafusion::scalar::ScalarValue;
Expand All @@ -29,7 +31,7 @@ use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
use substrait::proto::expression::nested::Struct as NestedStruct;
use substrait::proto::read_rel::{NamedTable, ReadType, VirtualTable};
use substrait::proto::rel::RelType;
use substrait::proto::{ReadRel, Rel};
use substrait::proto::{ReadRel, Rel, RelCommon, rel_common};

/// Converts rows of literal expressions into Substrait literal structs.
///
Expand Down Expand Up @@ -131,9 +133,46 @@ pub fn from_table_scan(
Some(Box::new(filter_expr))
};

// Statistics are serialised as f64 hints. Exact becomes Inexact after a
// round-trip since RelCommon.hint.stats is advisory. Zero-row tables lose
// their stats due to proto3 default-zero — both limitations are accepted.
let common = source_as_provider(&scan.source)
.ok()
.and_then(|provider| provider.statistics())
.and_then(|stats| {
let row_count = match stats.num_rows {
Precision::Exact(n) | Precision::Inexact(n) => Some(n as f64),
Precision::Absent => None,
};
// record_size = total_byte_size / num_rows (both must be present)
let record_size = match (stats.total_byte_size, row_count) {
(Precision::Exact(b) | Precision::Inexact(b), Some(rows))
if rows > 0.0 =>
{
Some(b as f64 / rows)
}
_ => None,
};
if row_count.is_none() && record_size.is_none() {
return None;
}
Some(RelCommon {
emit_kind: None,
hint: Some(rel_common::Hint {
stats: Some(rel_common::hint::Stats {
row_count: row_count.unwrap_or(0.0),
record_size: record_size.unwrap_or(0.0),
..Default::default()
}),
..Default::default()
}),
advanced_extension: None,
})
});

Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
common,
base_schema: Some(base_schema),
filter: filter_option,
best_effort_filter: None,
Expand Down
Loading