diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index dcd0910ecb4e9..3841409fbd255 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -92,6 +92,8 @@ pub struct ForeignLibraryModule { pub create_placement_udf: extern "C" fn() -> FFI_ScalarUDF, + pub create_lex_ordering_udf: extern "C" fn() -> FFI_ScalarUDF, + pub create_table_function: extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction, @@ -254,6 +256,7 @@ pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule { create_nullary_udf: create_ffi_random_func, create_timezone_udf: udf_udaf_udwf::create_timezone_func, create_placement_udf: udf_udaf_udwf::create_placement_func, + create_lex_ordering_udf: udf_udaf_udwf::create_lex_ordering_func, create_table_function: create_ffi_table_func, create_sum_udaf: create_ffi_sum_func, create_stddev_udaf: create_ffi_stddev_func, diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs b/datafusion/ffi/src/tests/udf_udaf_udwf.rs index b393f5db3a506..503f830312413 100644 --- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs +++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use arrow_schema::DataType; use datafusion_catalog::TableFunctionImpl; use datafusion_common::ScalarValue; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::{ AggregateUDF, ColumnarValue, ExpressionPlacement, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, WindowUDF, @@ -162,6 +163,50 @@ pub(crate) extern "C" fn create_placement_func() -> FFI_ScalarUDF { udf.into() } +#[derive(Debug, PartialEq, Eq, Hash)] +struct LexOrderingUDF { + signature: Signature, +} + +impl ScalarUDFImpl for LexOrderingUDF { + fn name(&self) -> &str { + "lex_ordering_udf" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> datafusion_common::Result { + Ok(DataType::Int64) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + datafusion_common::internal_err!("lex_ordering_udf is not meant to be invoked") + } + + fn preserves_lex_ordering( + &self, + inputs: &[ExprProperties], + ) -> datafusion_common::Result { + Ok(inputs.iter().all(|input| input.preserves_lex_ordering)) + } +} + +pub(crate) extern "C" fn create_lex_ordering_func() -> FFI_ScalarUDF { + let udf: Arc = Arc::new(ScalarUDF::from(LexOrderingUDF { + signature: Signature::uniform(1, vec![DataType::Int64], Volatility::Immutable), + })); + + udf.into() +} + pub(crate) extern "C" fn create_ffi_table_func( codec: FFI_LogicalExtensionCodec, ) -> FFI_TableFunction { diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 4fc22e859f9fb..feb1287553af7 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -26,6 +26,7 @@ use arrow::ffi::{FFI_ArrowSchema, from_ffi, to_ffi}; use arrow_schema::FieldRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result, internal_err}; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::fields_with_udf; use datafusion_expr::{ ColumnarValue, ExpressionPlacement, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, @@ -41,6 +42,7 @@ use stabby::vec::Vec as SVec; use crate::arrow_wrappers::{WrappedArray, WrappedSchema}; use crate::config::FFI_ConfigOptions; use crate::expr::columnar_value::FFI_ColumnarValue; +use crate::expr::expr_properties::FFI_ExprProperties; use crate::placement::FFI_ExpressionPlacement; use crate::util::{ FFI_Result, rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped, @@ -100,6 +102,12 @@ pub struct FFI_ScalarUDF { args: SVec, ) -> FFI_ExpressionPlacement, + /// FFI equivalent to [`ScalarUDFImpl::preserves_lex_ordering`]. + pub preserves_lex_ordering: unsafe extern "C" fn( + udf: &Self, + inputs: SVec, + ) -> FFI_Result, + /// Used to create a clone on the provider of the udf. This should /// only need to be called by the receiver of the udf. pub clone: unsafe extern "C" fn(udf: &Self) -> Self, @@ -178,6 +186,20 @@ unsafe extern "C" fn placement_fn_wrapper( udf.inner().placement(&args).into() } +unsafe extern "C" fn preserves_lex_ordering_fn_wrapper( + udf: &FFI_ScalarUDF, + inputs: SVec, +) -> FFI_Result { + let inputs = sresult_return!( + inputs + .into_iter() + .map(ExprProperties::try_from) + .collect::>>() + ); + + sresult!(udf.inner().preserves_lex_ordering(&inputs)) +} + unsafe extern "C" fn invoke_with_args_fn_wrapper( udf: &FFI_ScalarUDF, args: SVec, @@ -272,6 +294,7 @@ impl From> for FFI_ScalarUDF { return_field_from_args: return_field_from_args_fn_wrapper, coerce_types: coerce_types_fn_wrapper, placement: placement_fn_wrapper, + preserves_lex_ordering: preserves_lex_ordering_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -460,6 +483,17 @@ impl ScalarUDFImpl for ForeignScalarUDF { result.into() } + + fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> Result { + let inputs = inputs + .iter() + .map(FFI_ExprProperties::try_from) + .collect::>>()?; + + let result = unsafe { (self.udf.preserves_lex_ordering)(&self.udf, inputs) }; + + df_result!(result) + } } #[cfg(test)] @@ -502,6 +536,33 @@ mod tests { } } + #[derive(Debug, PartialEq, Eq, Hash)] + struct LexOrderingUDF { + signature: Signature, + } + + impl ScalarUDFImpl for LexOrderingUDF { + fn name(&self) -> &str { + "lex_ordering_udf" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!("LexOrderingUDF is not meant to be invoked") + } + + fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> Result { + Ok(inputs.iter().all(|input| input.preserves_lex_ordering)) + } + } + #[test] fn test_round_trip_scalar_udf() -> Result<()> { let original_udf = datafusion::functions::math::abs::AbsFunc::new(); @@ -574,4 +635,31 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udf_preserves_lex_ordering_round_trip() -> Result<()> { + use datafusion_expr::Volatility; + + let original_udf = Arc::new(ScalarUDF::from(LexOrderingUDF { + signature: Signature::uniform( + 1, + vec![DataType::Int64], + Volatility::Immutable, + ), + })); + + let mut ffi_udf = FFI_ScalarUDF::from(original_udf); + ffi_udf.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_udf: Arc = (&ffi_udf).into(); + assert!(foreign_udf.is::()); + + let preserves = ExprProperties::new_unknown().with_preserves_lex_ordering(true); + let does_not_preserve = ExprProperties::new_unknown(); + + assert!(foreign_udf.preserves_lex_ordering(std::slice::from_ref(&preserves))?); + assert!(!foreign_udf.preserves_lex_ordering(&[preserves, does_not_preserve])?); + + Ok(()) + } } diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs index dffaf83c479b1..965dc0674e3e1 100644 --- a/datafusion/ffi/tests/ffi_udf.rs +++ b/datafusion/ffi/tests/ffi_udf.rs @@ -27,6 +27,7 @@ mod tests { use datafusion::prelude::{SessionContext, col}; use datafusion_execution::config::SessionConfig; use datafusion_expr::lit; + use datafusion_expr::sort_properties::ExprProperties; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; @@ -116,6 +117,24 @@ mod tests { Ok(()) } + /// This test validates that a producer's `preserves_lex_ordering` override + /// survives the real dynamic-library FFI boundary. + #[tokio::test] + async fn test_scalar_udf_preserves_lex_ordering() -> Result<()> { + let module = get_module()?; + + let ffi_lex_ordering_func = (module.create_lex_ordering_udf)(); + let foreign_func: Arc = (&ffi_lex_ordering_func).into(); + + let preserves = ExprProperties::new_unknown().with_preserves_lex_ordering(true); + let does_not_preserve = ExprProperties::new_unknown(); + + assert!(foreign_func.preserves_lex_ordering(std::slice::from_ref(&preserves))?); + assert!(!foreign_func.preserves_lex_ordering(&[preserves, does_not_preserve])?); + + Ok(()) + } + #[tokio::test] async fn test_config_on_scalar_udf() -> Result<()> { let module = get_module()?;