22//!
33//!
44//! [DataFusion]: https://datafusion.apache.org/
5- use std:: { any:: Any , collections:: BTreeMap , hash:: Hash , ops:: DerefMut , sync:: Arc , time:: Duration } ;
5+ use std:: {
6+ any:: Any ,
7+ collections:: { BTreeMap , HashSet } ,
8+ hash:: Hash ,
9+ ops:: DerefMut ,
10+ sync:: Arc ,
11+ time:: Duration ,
12+ } ;
613
714use :: http:: HeaderName ;
815use arrow:: datatypes:: DataType ;
@@ -33,7 +40,7 @@ use wasmtime_wasi_http::{
3340
3441use crate :: {
3542 bindings:: exports:: datafusion_udf_wasm:: udf:: types as wit_types,
36- conversion:: limits:: { CheckedInto , TrustedDataLimits } ,
43+ conversion:: limits:: { CheckedInto , ComplexityToken , TrustedDataLimits } ,
3744 error:: { DataFusionResultExt , WasmToDataFusionResultExt , WitDataFusionResultExt } ,
3845 http:: { HttpRequestValidator , RejectAllHttpRequests } ,
3946 limiter:: { Limiter , StaticResourceLimits } ,
@@ -267,6 +274,9 @@ pub struct WasmPermissions {
267274 /// Trusted data limits.
268275 trusted_data_limits : TrustedDataLimits ,
269276
277+ /// Maximum number of UDFs.
278+ max_udfs : usize ,
279+
270280 /// Environment variables.
271281 envs : BTreeMap < String , String > ,
272282}
@@ -293,6 +303,7 @@ impl Default for WasmPermissions {
293303 stderr_bytes : 1024 , // 1KB
294304 resource_limits : StaticResourceLimits :: default ( ) ,
295305 trusted_data_limits : TrustedDataLimits :: default ( ) ,
306+ max_udfs : 20 ,
296307 envs : BTreeMap :: default ( ) ,
297308 }
298309 }
@@ -371,6 +382,19 @@ impl WasmPermissions {
371382 }
372383 }
373384
385+ /// Get the maximum number of UDFs that a payload/guest can produce.
386+ pub fn max_udfs ( & self ) -> usize {
387+ self . max_udfs
388+ }
389+
390+ /// Set the maximum number of UDFs that a payload/guest can produce.
391+ pub fn with_max_udfs ( self , limit : usize ) -> Self {
392+ Self {
393+ max_udfs : limit,
394+ ..self
395+ }
396+ }
397+
374398 /// Add environment variable.
375399 pub fn with_env ( mut self , key : String , value : String ) -> Self {
376400 self . envs . insert ( key, value) ;
@@ -571,10 +595,18 @@ impl WasmScalarUdf {
571595 ) ?
572596 . convert_err ( permissions. trusted_data_limits . clone ( ) )
573597 . context ( "scalar_udfs" ) ?;
598+ if udf_resources. len ( ) > permissions. max_udfs {
599+ return Err ( DataFusionError :: ResourcesExhausted ( format ! (
600+ "guest returned too many UDFs: got={}, limit={}" ,
601+ udf_resources. len( ) ,
602+ permissions. max_udfs,
603+ ) ) ) ;
604+ }
574605
575606 let store = Arc :: new ( Mutex :: new ( store) ) ;
576607
577608 let mut udfs = Vec :: with_capacity ( udf_resources. len ( ) ) ;
609+ let mut names_seen = HashSet :: with_capacity ( udf_resources. len ( ) ) ;
578610 for resource in udf_resources {
579611 let mut store_guard = store. lock ( ) . await ;
580612 let store2: & mut Store < WasmStateImpl > = & mut store_guard;
@@ -587,6 +619,13 @@ impl WasmScalarUdf {
587619 "call ScalarUdf::name" ,
588620 Some ( & store_guard. data ( ) . stderr . contents ( ) ) ,
589621 ) ?;
622+ ComplexityToken :: new ( permissions. trusted_data_limits . clone ( ) ) ?
623+ . check_identifier ( & name) ?;
624+ if !names_seen. insert ( name. clone ( ) ) {
625+ return Err ( DataFusionError :: External (
626+ format ! ( "non-unique UDF name: '{name}'" ) . into ( ) ,
627+ ) ) ;
628+ }
590629
591630 let store2: & mut Store < WasmStateImpl > = & mut store_guard;
592631 let signature: Signature = bindings
0 commit comments