Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/ffi/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct ForeignLibraryModule {

pub create_placement_udf: extern "C" fn() -> FFI_ScalarUDF,

pub create_lex_ordering_udf: extern "C" fn() -> FFI_ScalarUDF,

pub create_table_function:
extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,

Expand Down Expand Up @@ -254,6 +256,7 @@ pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule {
create_nullary_udf: create_ffi_random_func,
create_timezone_udf: udf_udaf_udwf::create_timezone_func,
create_placement_udf: udf_udaf_udwf::create_placement_func,
create_lex_ordering_udf: udf_udaf_udwf::create_lex_ordering_func,
create_table_function: create_ffi_table_func,
create_sum_udaf: create_ffi_sum_func,
create_stddev_udaf: create_ffi_stddev_func,
Expand Down
45 changes: 45 additions & 0 deletions datafusion/ffi/src/tests/udf_udaf_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use arrow_schema::DataType;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::ScalarValue;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::{
AggregateUDF, ColumnarValue, ExpressionPlacement, ScalarFunctionArgs, ScalarUDF,
ScalarUDFImpl, Signature, Volatility, WindowUDF,
Expand Down Expand Up @@ -162,6 +163,50 @@ pub(crate) extern "C" fn create_placement_func() -> FFI_ScalarUDF {
udf.into()
}

#[derive(Debug, PartialEq, Eq, Hash)]
struct LexOrderingUDF {
signature: Signature,
}

impl ScalarUDFImpl for LexOrderingUDF {
fn name(&self) -> &str {
"lex_ordering_udf"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(
&self,
_arg_types: &[DataType],
) -> datafusion_common::Result<DataType> {
Ok(DataType::Int64)
}

fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
datafusion_common::internal_err!("lex_ordering_udf is not meant to be invoked")
}

fn preserves_lex_ordering(
&self,
inputs: &[ExprProperties],
) -> datafusion_common::Result<bool> {
Ok(inputs.iter().all(|input| input.preserves_lex_ordering))
}
}

pub(crate) extern "C" fn create_lex_ordering_func() -> FFI_ScalarUDF {
let udf: Arc<ScalarUDF> = Arc::new(ScalarUDF::from(LexOrderingUDF {
signature: Signature::uniform(1, vec![DataType::Int64], Volatility::Immutable),
}));

udf.into()
}

pub(crate) extern "C" fn create_ffi_table_func(
codec: FFI_LogicalExtensionCodec,
) -> FFI_TableFunction {
Expand Down
88 changes: 88 additions & 0 deletions datafusion/ffi/src/udf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow::ffi::{FFI_ArrowSchema, from_ffi, to_ffi};
use arrow_schema::FieldRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result, internal_err};
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::type_coercion::functions::fields_with_udf;
use datafusion_expr::{
ColumnarValue, ExpressionPlacement, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
Expand All @@ -41,6 +42,7 @@ use stabby::vec::Vec as SVec;
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
use crate::config::FFI_ConfigOptions;
use crate::expr::columnar_value::FFI_ColumnarValue;
use crate::expr::expr_properties::FFI_ExprProperties;
use crate::placement::FFI_ExpressionPlacement;
use crate::util::{
FFI_Result, rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped,
Expand Down Expand Up @@ -100,6 +102,12 @@ pub struct FFI_ScalarUDF {
args: SVec<FFI_ExpressionPlacement>,
) -> FFI_ExpressionPlacement,

/// FFI equivalent to [`ScalarUDFImpl::preserves_lex_ordering`].
pub preserves_lex_ordering: unsafe extern "C" fn(
udf: &Self,
inputs: SVec<FFI_ExprProperties>,
) -> FFI_Result<bool>,

/// Used to create a clone on the provider of the udf. This should
/// only need to be called by the receiver of the udf.
pub clone: unsafe extern "C" fn(udf: &Self) -> Self,
Expand Down Expand Up @@ -178,6 +186,20 @@ unsafe extern "C" fn placement_fn_wrapper(
udf.inner().placement(&args).into()
}

unsafe extern "C" fn preserves_lex_ordering_fn_wrapper(
udf: &FFI_ScalarUDF,
inputs: SVec<FFI_ExprProperties>,
) -> FFI_Result<bool> {
let inputs = sresult_return!(
inputs
.into_iter()
.map(ExprProperties::try_from)
.collect::<Result<Vec<_>>>()
);

sresult!(udf.inner().preserves_lex_ordering(&inputs))
}

unsafe extern "C" fn invoke_with_args_fn_wrapper(
udf: &FFI_ScalarUDF,
args: SVec<WrappedArray>,
Expand Down Expand Up @@ -272,6 +294,7 @@ impl From<Arc<ScalarUDF>> for FFI_ScalarUDF {
return_field_from_args: return_field_from_args_fn_wrapper,
coerce_types: coerce_types_fn_wrapper,
placement: placement_fn_wrapper,
preserves_lex_ordering: preserves_lex_ordering_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
private_data: Box::into_raw(private_data) as *mut c_void,
Expand Down Expand Up @@ -460,6 +483,17 @@ impl ScalarUDFImpl for ForeignScalarUDF {

result.into()
}

fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> Result<bool> {
let inputs = inputs
.iter()
.map(FFI_ExprProperties::try_from)
.collect::<Result<SVec<_>>>()?;

let result = unsafe { (self.udf.preserves_lex_ordering)(&self.udf, inputs) };

df_result!(result)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -502,6 +536,33 @@ mod tests {
}
}

#[derive(Debug, PartialEq, Eq, Hash)]
struct LexOrderingUDF {
signature: Signature,
}

impl ScalarUDFImpl for LexOrderingUDF {
fn name(&self) -> &str {
"lex_ordering_udf"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}

fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
internal_err!("LexOrderingUDF is not meant to be invoked")
}

fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> Result<bool> {
Ok(inputs.iter().all(|input| input.preserves_lex_ordering))
}
}

#[test]
fn test_round_trip_scalar_udf() -> Result<()> {
let original_udf = datafusion::functions::math::abs::AbsFunc::new();
Expand Down Expand Up @@ -574,4 +635,31 @@ mod tests {

Ok(())
}

#[test]
fn test_ffi_udf_preserves_lex_ordering_round_trip() -> Result<()> {
use datafusion_expr::Volatility;

let original_udf = Arc::new(ScalarUDF::from(LexOrderingUDF {
signature: Signature::uniform(
1,
vec![DataType::Int64],
Volatility::Immutable,
),
}));

let mut ffi_udf = FFI_ScalarUDF::from(original_udf);
ffi_udf.library_marker_id = crate::mock_foreign_marker_id;

let foreign_udf: Arc<dyn ScalarUDFImpl> = (&ffi_udf).into();
assert!(foreign_udf.is::<ForeignScalarUDF>());

let preserves = ExprProperties::new_unknown().with_preserves_lex_ordering(true);
let does_not_preserve = ExprProperties::new_unknown();

assert!(foreign_udf.preserves_lex_ordering(std::slice::from_ref(&preserves))?);
assert!(!foreign_udf.preserves_lex_ordering(&[preserves, does_not_preserve])?);

Ok(())
}
}
19 changes: 19 additions & 0 deletions datafusion/ffi/tests/ffi_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod tests {
use datafusion::prelude::{SessionContext, col};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::lit;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_ffi::tests::create_record_batch;
use datafusion_ffi::tests::utils::get_module;
use std::sync::Arc;
Expand Down Expand Up @@ -116,6 +117,24 @@ mod tests {
Ok(())
}

/// This test validates that a producer's `preserves_lex_ordering` override
/// survives the real dynamic-library FFI boundary.
#[tokio::test]
async fn test_scalar_udf_preserves_lex_ordering() -> Result<()> {
let module = get_module()?;

let ffi_lex_ordering_func = (module.create_lex_ordering_udf)();
let foreign_func: Arc<dyn ScalarUDFImpl> = (&ffi_lex_ordering_func).into();

let preserves = ExprProperties::new_unknown().with_preserves_lex_ordering(true);
let does_not_preserve = ExprProperties::new_unknown();

assert!(foreign_func.preserves_lex_ordering(std::slice::from_ref(&preserves))?);
assert!(!foreign_func.preserves_lex_ordering(&[preserves, does_not_preserve])?);

Ok(())
}

#[tokio::test]
async fn test_config_on_scalar_udf() -> Result<()> {
let module = get_module()?;
Expand Down
Loading