Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.82.0"
readme = "README.md"

[dependencies]
arrow = { version = "54.0.0" }
arrow = { workspace = true }
async-trait = "0.1.73"
aws-config = "1.5.5"
# begin pin aws-sdk crates otherwise CI MSRV check fails
Expand Down Expand Up @@ -57,7 +57,7 @@ futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.11.0", features = ["aws", "gcp", "http"] }
parking_lot = { version = "0.12" }
parquet = { version = "54.0.0", default-features = false }
parquet = { workspace = true, default-features = false }
regex = "1.8"
rustyline = "14.0"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] }
Expand Down
40 changes: 24 additions & 16 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

let skip = match skip {
Some(skip_expr) => {
let expr = self.sql_to_expr(
skip_expr.value,
input.schema(),
&mut PlannerContext::new(),
)?;
let n = get_constant_result(&expr, "OFFSET")?;
convert_usize_with_check(n, "OFFSET")
}
Some(skip_expr) => self.get_constant_usize_result(skip_expr.value, input.schema(), "OFFSET"),
_ => Ok(0),
}?;

let fetch = match fetch {
Some(limit_expr)
if limit_expr != sqlparser::ast::Expr::Value(Value::Null) =>
{
let expr = self.sql_to_expr(
limit_expr,
input.schema(),
&mut PlannerContext::new(),
)?;
let n = get_constant_result(&expr, "LIMIT")?;
Some(convert_usize_with_check(n, "LIMIT")?)
Some(self.get_constant_usize_result(limit_expr, input.schema(), "LIMIT")?)
}
_ => None,
};
Expand Down Expand Up @@ -156,6 +142,28 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
_ => Ok(plan),
}
}

/// Retrieves the constant usize result of an SQL expression, evaluating it if possible.
///
/// This function takes an SQL expression, a related scheme and an argument name as input and returns
/// a `Result<usize>` indicating either the constant result of the expression or an
/// error if the expression cannot be evaluated.
///
/// # Arguments
///
/// * `expr` - An `SQLExpr` representing the expression to evaluate.
/// * `schema` - A related DataFusion schema to apply while converting an `expr` into a logical expression.
/// * `arg_name` - The name of the argument for error messages.
///
/// # Returns
///
/// * `Result<usize>` - An `Ok` variant containing the constant result if evaluation is successful,
/// or an `Err` variant containing an error message if evaluation fails.
pub(super) fn get_constant_usize_result(&self, expr: SQLExpr, schema: &datafusion_common::DFSchema, arg_name: &str) -> Result<usize> {
let expr = self.sql_to_expr(expr, schema, &mut PlannerContext::new())?;
let value = get_constant_result(&expr, arg_name)?;
convert_usize_with_check(value, arg_name)
}
}

/// Retrieves the constant result of an expression, evaluating it if possible.
Expand Down
17 changes: 11 additions & 6 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan_err!("Delete-order-by clause not yet supported")?;
}

if limit.is_some() {
plan_err!("Delete-limit clause not yet supported")?;
}

let table_name = self.get_delete_target(from)?;
self.delete_to_plan(table_name, selection)
self.delete_to_plan(table_name, selection, limit)
}

Statement::StartTransaction {
Expand Down Expand Up @@ -1223,6 +1219,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
&self,
table_name: ObjectName,
predicate_expr: Option<SQLExpr>,
limit: Option<SQLExpr>
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(table_name.clone())?;
Expand All @@ -1237,7 +1234,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.build()?;
let mut planner_context = PlannerContext::new();

let source = match predicate_expr {
let mut source = match predicate_expr {
None => scan,
Some(predicate_expr) => {
let filter_expr =
Expand All @@ -1254,6 +1251,14 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
};

if let Some(limit_expr) = limit {
let limit = (limit_expr != SQLExpr::Value(Value::Null))
.then(|| self.get_constant_usize_result(limit_expr, source.schema(), "LIMIT"))
.transpose()?;

source = LogicalPlanBuilder::from(source).limit(0, limit)?.build()?
}

let plan = LogicalPlan::Dml(DmlStatement::new(
table_ref,
table_source,
Expand Down
12 changes: 12 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,18 @@ Dml: op=[Delete] table=[person]
quick_test(sql, plan);
}

#[test]
fn plan_delete_limited() {
let sql = "delete from person limit 2";
let plan = r#"
Dml: op=[Delete] table=[person]
Limit: skip=0, fetch=2
TableScan: person
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn select_column_does_not_exist() {
let sql = "SELECT doesnotexist FROM person";
Expand Down
Loading