From 931f2bcff846d54fc7aa062af84e2d698375d80d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 8 Apr 2026 17:37:36 -0400 Subject: [PATCH 1/2] Start testing what will be DataFusion 54.0. --- native/Cargo.lock | 221 +++++++++--------- native/Cargo.toml | 8 +- native/core/Cargo.toml | 2 +- native/core/src/execution/operators/expand.rs | 18 +- .../src/execution/operators/iceberg_scan.rs | 9 +- .../src/execution/operators/parquet_writer.rs | 17 +- native/core/src/execution/operators/scan.rs | 9 +- .../src/execution/operators/shuffle_scan.rs | 9 +- native/core/src/execution/planner.rs | 2 +- native/shuffle/src/shuffle_writer.rs | 16 +- native/spark-expr/src/agg_funcs/avg.rs | 9 +- .../spark-expr/src/agg_funcs/avg_decimal.rs | 7 +- .../spark-expr/src/agg_funcs/correlation.rs | 7 +- native/spark-expr/src/agg_funcs/covariance.rs | 6 - native/spark-expr/src/agg_funcs/stddev.rs | 7 +- .../spark-expr/src/agg_funcs/sum_decimal.rs | 6 +- native/spark-expr/src/agg_funcs/sum_int.rs | 6 +- native/spark-expr/src/agg_funcs/variance.rs | 6 - .../src/array_funcs/array_compact.rs | 5 - native/spark-expr/src/array_funcs/size.rs | 5 - .../src/bloom_filter/bloom_filter_agg.rs | 6 +- .../bloom_filter_might_contain.rs | 5 - native/spark-expr/src/comet_scalar_funcs.rs | 5 - .../src/datetime_funcs/date_diff.rs | 5 - .../src/datetime_funcs/date_trunc.rs | 5 - .../src/datetime_funcs/extract_date_part.rs | 6 +- .../src/datetime_funcs/make_date.rs | 5 - .../src/datetime_funcs/unix_timestamp.rs | 6 +- .../spark-expr/src/string_funcs/contains.rs | 5 - 29 files changed, 184 insertions(+), 239 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 0c0c9e97b7..3dbaed308b 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -23,7 +23,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" dependencies = [ - "crypto-common", + "crypto-common 0.1.7", "generic-array", ] @@ -184,7 +184,7 @@ checksum = "36fa98bc79671c7981272d91a8753a928ff6a1cd8e4f20a44c45bd5d313840bf" dependencies = [ "bigdecimal", "bon", - "digest", + "digest 0.10.7", "log", "miniz_oxide", "num-bigint", @@ -646,7 +646,7 @@ dependencies = [ "fastrand", "hex", "http 1.4.0", - "sha1", + "sha1 0.10.6", "time", "tokio", "tracing", @@ -1108,7 +1108,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -1134,6 +1134,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" +dependencies = [ + "hybrid-array", +] + [[package]] name = "block-padding" version = "0.3.3" @@ -1377,7 +1386,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ - "crypto-common", + "crypto-common 0.1.7", "inout", ] @@ -1503,6 +1512,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + [[package]] name = "const-random" version = "0.1.18" @@ -1676,6 +1691,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-common" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710" +dependencies = [ + "hybrid-array", +] + [[package]] name = "csv" version = "1.4.0" @@ -1808,13 +1832,11 @@ dependencies = [ [[package]] name = "datafusion" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de9f8117889ba9503440f1dd79ebab32ba52ccf1720bb83cd718a29d4edc0d16" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "arrow-schema", "async-trait", - "bytes", "chrono", "datafusion-catalog", "datafusion-catalog-listing", @@ -1847,8 +1869,6 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand 0.9.2", - "regex", "sqlparser", "tempfile", "tokio", @@ -1859,8 +1879,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be893b73a13671f310ffcc8da2c546b81efcc54c22e0382c0a28aa3537017137" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-trait", @@ -1884,8 +1903,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830487b51ed83807d6b32d6325f349c3144ae0c9bf772cf2a712db180c31d5e6" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-trait", @@ -2078,13 +2096,13 @@ dependencies = [ [[package]] name = "datafusion-common" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d7663f3af955292f8004e74bcaf8f7ea3d66cc38438749615bb84815b61a293" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ - "ahash", "arrow", "arrow-ipc", + "arrow-schema", "chrono", + "foldhash 0.2.0", "half", "hashbrown 0.16.1", "hex", @@ -2094,17 +2112,16 @@ dependencies = [ "log", "object_store", "parquet", - "paste", "sqlparser", "tokio", + "uuid", "web-time", ] [[package]] name = "datafusion-common-runtime" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f590205c7e32fe1fea48dd53ffb406e56ae0e7a062213a3ac848db8771641bd" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "futures", "log", @@ -2114,8 +2131,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde1e030a9dc87b743c806fbd631f5ecfa2ccaa4ffb61fa19144a07fea406b79" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-compression", @@ -2149,8 +2165,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331ebae7055dc108f9b54994b93dff91f3a17445539efe5b74e89264f7b36e15" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "arrow-ipc", @@ -2173,8 +2188,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e0d475088325e2986876aa27bb30d0574f72a22955a527d202f454681d55c5c" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-trait", @@ -2196,8 +2210,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea1520d81f31770f3ad6ee98b391e75e87a68a5bb90de70064ace5e0a7182fe8" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-trait", @@ -2212,7 +2225,6 @@ dependencies = [ "datafusion-session", "futures", "object_store", - "serde_json", "tokio", "tokio-stream", ] @@ -2220,8 +2232,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95be805d0742ab129720f4c51ad9242cd872599cdb076098b03f061fcdc7f946" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-trait", @@ -2231,6 +2242,7 @@ dependencies = [ "datafusion-datasource", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-adapter", @@ -2250,19 +2262,16 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c93ad9e37730d2c7196e68616f3f2dd3b04c892e03acd3a8eeca6e177f3c06a" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" [[package]] name = "datafusion-execution" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9437d3cd5d363f9319f8122182d4d233427de79c7eb748f23054c9aaa0fdd8df" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "arrow-buffer", "async-trait", - "chrono", "dashmap", "datafusion-common", "datafusion-expr", @@ -2280,10 +2289,10 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67164333342b86521d6d93fa54081ee39839894fb10f7a700c099af96d7552cf" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", + "arrow-schema", "async-trait", "chrono", "datafusion-common", @@ -2294,7 +2303,6 @@ dependencies = [ "datafusion-physical-expr-common", "indexmap 2.13.1", "itertools 0.14.0", - "paste", "serde_json", "sqlparser", ] @@ -2302,21 +2310,18 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab05fdd00e05d5a6ee362882546d29d6d3df43a6c55355164a7fbee12d163bc9" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "datafusion-common", "indexmap 2.13.1", "itertools 0.14.0", - "paste", ] [[package]] name = "datafusion-functions" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04fb863482d987cf938db2079e07ab0d3bb64595f28907a6c2f8671ad71cca7e" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "arrow-buffer", @@ -2347,10 +2352,8 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "829856f4e14275fb376c104f27cbf3c3b57a9cfe24885d98677525f5e43ce8d6" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ - "ahash", "arrow", "datafusion-common", "datafusion-doc", @@ -2360,19 +2363,17 @@ dependencies = [ "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", + "foldhash 0.2.0", "half", "log", "num-traits", - "paste", ] [[package]] name = "datafusion-functions-aggregate-common" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08af79cc3d2aa874a362fb97decfcbd73d687190cb096f16a6c85a7780cce311" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ - "ahash", "arrow", "datafusion-common", "datafusion-expr-common", @@ -2382,8 +2383,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465ae3368146d49c2eda3e2c0ef114424c87e8a6b509ab34c1026ace6497e790" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "arrow-ord", @@ -2401,14 +2401,13 @@ dependencies = [ "itertools 0.14.0", "itoa", "log", - "paste", + "memchr", ] [[package]] name = "datafusion-functions-table" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6156e6b22fcf1784112fc0173f3ae6e78c8fdb4d3ed0eace9543873b437e2af6" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "async-trait", @@ -2417,14 +2416,12 @@ dependencies = [ "datafusion-expr", "datafusion-physical-plan", "parking_lot", - "paste", ] [[package]] name = "datafusion-functions-window" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca7baec14f866729012efb89011a6973f3a346dc8090c567bfcd328deff551c1" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "datafusion-common", @@ -2435,14 +2432,12 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "log", - "paste", ] [[package]] name = "datafusion-functions-window-common" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159228c3280d342658466bb556dc24de30047fe1d7e559dc5d16ccc5324166f9" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -2451,8 +2446,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5427e5da5edca4d21ea1c7f50e1c9421775fe33d7d5726e5641a833566e7578" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "datafusion-doc", "quote", @@ -2462,8 +2456,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89099eefcd5b223ec685c36a41d35c69239236310d71d339f2af0fa4383f3f46" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "chrono", @@ -2481,10 +2474,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f222df5195d605d79098ef37bdd5323bff0131c9d877a24da6ec98dfca9fe36" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ - "ahash", "arrow", "datafusion-common", "datafusion-expr", @@ -2496,7 +2487,6 @@ dependencies = [ "indexmap 2.13.1", "itertools 0.14.0", "parking_lot", - "paste", "petgraph", "tokio", ] @@ -2504,8 +2494,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40838625d63d9c12549d81979db3dd675d159055eb9135009ba272ab0e8d0f64" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "datafusion-common", @@ -2519,10 +2508,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eacbcc4cfd502558184ed58fa3c72e775ec65bf077eef5fd2b3453db676f893c" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ - "ahash", "arrow", "chrono", "datafusion-common", @@ -2536,8 +2523,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d501d0e1d0910f015677121601ac177ec59272ef5c9324d1147b394988f40941" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "datafusion-common", @@ -2554,10 +2540,8 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "463c88ad6f1ecab1810f4c9f046898bee035b370137eb79b2b2db925e270631d" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ - "ahash", "arrow", "arrow-ord", "arrow-schema", @@ -2586,8 +2570,7 @@ dependencies = [ [[package]] name = "datafusion-pruning" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2857618a0ecbd8cd0cf29826889edd3a25774ec26b2995fc3862095c95d88fc6" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "datafusion-common", @@ -2596,15 +2579,13 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools 0.14.0", "log", ] [[package]] name = "datafusion-session" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef8637e35022c5c775003b3ab1debc6b4a8f0eb41b069bdd5475dd3aa93f6eba" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "async-trait", "datafusion-common", @@ -2617,8 +2598,7 @@ dependencies = [ [[package]] name = "datafusion-spark" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "923a8b871962a9d860f036f743a20af50ff04729f1da2468ed220dab4f61c97d" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "bigdecimal", @@ -2633,10 +2613,11 @@ dependencies = [ "datafusion-functions-aggregate", "datafusion-functions-nested", "log", + "num-traits", "percent-encoding", "rand 0.9.2", "serde_json", - "sha1", + "sha1 0.11.0", "sha2", "url", ] @@ -2644,8 +2625,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "53.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12d9e9f16a1692a11c94bcc418191fa15fd2b4d72a0c1a0c607db93c0b84dd81" +source = "git+https://github.com/mbutrovich/datafusion.git?branch=dyncomparator#e0d06eea91e4649d853305c0224350dd2503d5f1" dependencies = [ "arrow", "bigdecimal", @@ -2674,7 +2654,7 @@ version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ - "const-oid", + "const-oid 0.9.6", "pem-rfc7468", "zeroize", ] @@ -2754,12 +2734,23 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", - "const-oid", - "crypto-common", + "block-buffer 0.10.4", + "const-oid 0.9.6", + "crypto-common 0.1.7", "subtle", ] +[[package]] +name = "digest" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +dependencies = [ + "block-buffer 0.12.0", + "const-oid 0.10.2", + "crypto-common 0.2.1", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -3310,7 +3301,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -3389,6 +3380,15 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" +[[package]] +name = "hybrid-array" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3944cf8cf766b40e2a1a333ee5e9b563f854d5fa49d6a8ca2764e97c6eddb214" +dependencies = [ + "typenum", +] + [[package]] name = "hyper" version = "1.9.0" @@ -4191,7 +4191,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ "cfg-if", - "digest", + "digest 0.10.7", ] [[package]] @@ -4798,7 +4798,7 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" dependencies = [ - "digest", + "digest 0.10.7", "hmac", ] @@ -5467,7 +5467,7 @@ dependencies = [ "rust-ini", "serde", "serde_json", - "sha1", + "sha1 0.10.6", "sha2", "tokio", ] @@ -5489,7 +5489,7 @@ dependencies = [ "jiff", "log", "percent-encoding", - "sha1", + "sha1 0.10.6", "sha2", "windows-sys 0.61.2", ] @@ -5576,8 +5576,8 @@ version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" dependencies = [ - "const-oid", - "digest", + "const-oid 0.9.6", + "digest 0.10.7", "num-bigint-dig", "num-integer", "num-traits", @@ -5965,7 +5965,18 @@ checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures 0.2.17", - "digest", + "digest 0.10.7", +] + +[[package]] +name = "sha1" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aacc4cc499359472b4abe1bf11d0b12e688af9a805fa5e3016f9a386dc2d0214" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "digest 0.11.2", ] [[package]] @@ -5976,7 +5987,7 @@ checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", "cpufeatures 0.2.17", - "digest", + "digest 0.10.7", ] [[package]] @@ -6001,7 +6012,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ - "digest", + "digest 0.10.7", "rand_core 0.6.4", ] @@ -6664,7 +6675,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" dependencies = [ - "crypto-common", + "crypto-common 0.1.7", "subtle", ] diff --git a/native/Cargo.toml b/native/Cargo.toml index b71bc0c73c..6e66056b55 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -38,10 +38,10 @@ arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz"] } async-trait = { version = "0.1" } bytes = { version = "1.11.1" } parquet = { version = "58.1.0", default-features = false, features = ["experimental"] } -datafusion = { version = "53.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } -datafusion-datasource = { version = "53.0.0" } -datafusion-physical-expr-adapter = { version = "53.0.0" } -datafusion-spark = { version = "53.0.0", features = ["core"] } +datafusion = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } +datafusion-datasource = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator" } +datafusion-physical-expr-adapter = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator" } +datafusion-spark = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator", features = ["core"] } datafusion-comet-spark-expr = { path = "spark-expr" } datafusion-comet-common = { path = "common" } datafusion-comet-jni-bridge = { path = "jni-bridge" } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 872260c02f..c922493d20 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -91,7 +91,7 @@ jni = { version = "0.22.4", features = ["invocation"] } lazy_static = "1.4" assertables = "9" hex = "0.4.3" -datafusion-functions-nested = { version = "53.0.0" } +datafusion-functions-nested = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator" } [features] backtrace = ["datafusion/backtrace"] diff --git a/native/core/src/execution/operators/expand.rs b/native/core/src/execution/operators/expand.rs index e06fab23ec..c2c9c671d5 100644 --- a/native/core/src/execution/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -17,6 +17,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::SchemaRef; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::DataFusionError; use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -29,7 +30,6 @@ use datafusion::{ }; use futures::{Stream, StreamExt}; use std::{ - any::Any, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -91,8 +91,20 @@ impl DisplayAs for ExpandExec { } impl ExecutionPlan for ExpandExec { - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + f: &mut dyn FnMut(&dyn PhysicalExpr) -> datafusion::common::Result, + ) -> datafusion::common::Result { + for projection in &self.projections { + for expr in projection { + match f(expr.as_ref())? { + TreeNodeRecursion::Continue => {} + TreeNodeRecursion::Jump => {} + TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), + } + } + } + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index d217ebc34b..a2dae49742 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -17,7 +17,6 @@ //! Native Iceberg table scan operator using iceberg-rust -use std::any::Any; use std::collections::HashMap; use std::fmt; use std::pin::Pin; @@ -26,6 +25,7 @@ use std::task::{Context, Poll}; use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::datatypes::SchemaRef; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::expressions::Column; @@ -108,8 +108,11 @@ impl ExecutionPlan for IcebergScanExec { "IcebergScanExec" } - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> DFResult, + ) -> DFResult { + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8ba79098d4..f66bc00186 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -18,7 +18,6 @@ //! Parquet writer operator for writing RecordBatches to Parquet files use std::{ - any::Any, collections::HashMap, fmt, fmt::{Debug, Formatter}, @@ -38,6 +37,7 @@ use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, @@ -46,8 +46,8 @@ use datafusion::{ execution_plan::{Boundedness, EmissionType}, metrics::{ExecutionPlanMetricsSet, MetricsSet}, stream::RecordBatchStreamAdapter, - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, + PlanProperties, SendableRecordBatchStream, }, }; use futures::TryStreamExt; @@ -404,14 +404,17 @@ impl DisplayAs for ParquetWriterExec { #[async_trait] impl ExecutionPlan for ParquetWriterExec { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "ParquetWriterExec" } + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 90bb741b5e..9ac9e4586a 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -28,6 +28,7 @@ use arrow::compute::{cast_with_options, take, CastOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::ffi::FFI_ArrowArray; use arrow::ffi::FFI_ArrowSchema; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ @@ -43,7 +44,6 @@ use itertools::Itertools; use jni::objects::{Global, JObject, JValue}; use std::rc::Rc; use std::{ - any::Any, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, @@ -383,8 +383,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef { } impl ExecutionPlan for ScanExec { - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult, + ) -> DataFusionResult { + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/operators/shuffle_scan.rs b/native/core/src/execution/operators/shuffle_scan.rs index 92c4dc8780..18ae134484 100644 --- a/native/core/src/execution/operators/shuffle_scan.rs +++ b/native/core/src/execution/operators/shuffle_scan.rs @@ -24,6 +24,7 @@ use crate::{ }; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{arrow_datafusion_err, Result as DataFusionResult}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ @@ -37,7 +38,6 @@ use datafusion::{ use futures::Stream; use jni::objects::{Global, JByteBuffer, JObject}; use std::{ - any::Any, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, @@ -221,8 +221,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef { } impl ExecutionPlan for ShuffleScanExec { - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult, + ) -> DataFusionResult { + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ac35925ace..7275470db1 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1704,7 +1704,7 @@ impl PhysicalPlanner { hash_join.as_ref().swap_inputs(PartitionMode::Partitioned)?; let mut additional_native_plans = vec![]; - if swapped_hash_join.as_any().is::() { + if swapped_hash_join.is::() { // a projection was added to the hash join additional_native_plans.push(Arc::clone(swapped_hash_join.children()[0])); } diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 4ac4fc287b..3cc46704be 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -25,7 +25,8 @@ use crate::partitioners::{ use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; -use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::common::tree_node::TreeNodeRecursion; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::EmptyRecordBatchStream; use datafusion::{ @@ -41,7 +42,6 @@ use datafusion::{ use datafusion_comet_common::tracing::with_trace_async; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use std::{ - any::Any, fmt, fmt::{Debug, Formatter}, sync::Arc, @@ -120,15 +120,17 @@ impl DisplayAs for ShuffleWriterExec { #[async_trait] impl ExecutionPlan for ShuffleWriterExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "ShuffleWriterExec" } + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/native/spark-expr/src/agg_funcs/avg.rs b/native/spark-expr/src/agg_funcs/avg.rs index 3760b42504..24a9a30991 100644 --- a/native/spark-expr/src/agg_funcs/avg.rs +++ b/native/spark-expr/src/agg_funcs/avg.rs @@ -28,7 +28,7 @@ use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; use datafusion::physical_expr::expressions::format_state_name; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use arrow::array::ArrowNativeTypeOp; use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -67,11 +67,6 @@ impl Avg { } impl AggregateUDFImpl for Avg { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { // All numeric types use Float64 accumulation after casting match (&self.input_data_type, &self.result_data_type) { @@ -239,7 +234,7 @@ where impl GroupsAccumulator for AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, i64) -> Result + Send, + F: Fn(T::Native, i64) -> Result + Send + 'static, { fn update_batch( &mut self, diff --git a/native/spark-expr/src/agg_funcs/avg_decimal.rs b/native/spark-expr/src/agg_funcs/avg_decimal.rs index 9e8a31afa5..2722add556 100644 --- a/native/spark-expr/src/agg_funcs/avg_decimal.rs +++ b/native/spark-expr/src/agg_funcs/avg_decimal.rs @@ -28,7 +28,7 @@ use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; use datafusion::physical_expr::expressions::format_state_name; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::utils::{build_bool_state, is_valid_decimal_precision, unlikely}; use crate::{decimal_sum_overflow_error, EvalMode, SparkErrorWithContext}; @@ -108,11 +108,6 @@ impl AvgDecimal { } impl AggregateUDFImpl for AvgDecimal { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { match (&self.sum_data_type, &self.result_data_type) { (Decimal128(sum_precision, sum_scale), Decimal128(target_precision, target_scale)) => { diff --git a/native/spark-expr/src/agg_funcs/correlation.rs b/native/spark-expr/src/agg_funcs/correlation.rs index 9803855e35..aebf322620 100644 --- a/native/spark-expr/src/agg_funcs/correlation.rs +++ b/native/spark-expr/src/agg_funcs/correlation.rs @@ -17,7 +17,7 @@ use arrow::compute::{and, filter, is_not_null}; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::agg_funcs::covariance::CovarianceAccumulator; use crate::agg_funcs::stddev::StddevAccumulator; @@ -58,11 +58,6 @@ impl Correlation { } impl AggregateUDFImpl for Correlation { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/agg_funcs/covariance.rs b/native/spark-expr/src/agg_funcs/covariance.rs index 15759eb155..a2256da525 100644 --- a/native/spark-expr/src/agg_funcs/covariance.rs +++ b/native/spark-expr/src/agg_funcs/covariance.rs @@ -29,7 +29,6 @@ use datafusion::logical_expr::type_coercion::aggregates::NUMERICS; use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; use datafusion::physical_expr::expressions::format_state_name; use datafusion::physical_expr::expressions::StatsType; -use std::any::Any; use std::sync::Arc; /// COVAR_SAMP and COVAR_POP aggregate expression @@ -73,11 +72,6 @@ impl Covariance { } impl AggregateUDFImpl for Covariance { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/agg_funcs/stddev.rs b/native/spark-expr/src/agg_funcs/stddev.rs index b231b8afa7..2b09339dc1 100644 --- a/native/spark-expr/src/agg_funcs/stddev.rs +++ b/native/spark-expr/src/agg_funcs/stddev.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::agg_funcs::variance::VarianceAccumulator; use arrow::datatypes::FieldRef; @@ -78,11 +78,6 @@ impl Stddev { } impl AggregateUDFImpl for Stddev { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/agg_funcs/sum_decimal.rs b/native/spark-expr/src/agg_funcs/sum_decimal.rs index 46db7f36b3..faa51252f8 100644 --- a/native/spark-expr/src/agg_funcs/sum_decimal.rs +++ b/native/spark-expr/src/agg_funcs/sum_decimal.rs @@ -27,7 +27,7 @@ use datafusion::logical_expr::Volatility::Immutable; use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; #[derive(Debug)] pub struct SumDecimal { @@ -99,10 +99,6 @@ impl SumDecimal { } impl AggregateUDFImpl for SumDecimal { - fn as_any(&self) -> &dyn Any { - self - } - fn accumulator(&self, _args: AccumulatorArgs) -> DFResult> { Ok(Box::new(SumDecimalAccumulator::new( self.precision, diff --git a/native/spark-expr/src/agg_funcs/sum_int.rs b/native/spark-expr/src/agg_funcs/sum_int.rs index 781528521b..7d1df1568b 100644 --- a/native/spark-expr/src/agg_funcs/sum_int.rs +++ b/native/spark-expr/src/agg_funcs/sum_int.rs @@ -29,7 +29,7 @@ use datafusion::logical_expr::Volatility::Immutable; use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] pub struct SumInteger { @@ -52,10 +52,6 @@ impl SumInteger { } impl AggregateUDFImpl for SumInteger { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "sum" } diff --git a/native/spark-expr/src/agg_funcs/variance.rs b/native/spark-expr/src/agg_funcs/variance.rs index c97e664dd6..5f2c249da7 100644 --- a/native/spark-expr/src/agg_funcs/variance.rs +++ b/native/spark-expr/src/agg_funcs/variance.rs @@ -26,7 +26,6 @@ use datafusion::logical_expr::Volatility::Immutable; use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature}; use datafusion::physical_expr::expressions::format_state_name; use datafusion::physical_expr::expressions::StatsType; -use std::any::Any; use std::sync::Arc; /// VAR_SAMP and VAR_POP aggregate expression @@ -71,11 +70,6 @@ impl Variance { } impl AggregateUDFImpl for Variance { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/array_funcs/array_compact.rs b/native/spark-expr/src/array_funcs/array_compact.rs index 4653f966a5..d481242705 100644 --- a/native/spark-expr/src/array_funcs/array_compact.rs +++ b/native/spark-expr/src/array_funcs/array_compact.rs @@ -33,7 +33,6 @@ use datafusion::common::{exec_err, utils::take_function_args, Result}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; -use std::any::Any; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] @@ -56,10 +55,6 @@ impl SparkArrayCompact { } impl ScalarUDFImpl for SparkArrayCompact { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "spark_array_compact" } diff --git a/native/spark-expr/src/array_funcs/size.rs b/native/spark-expr/src/array_funcs/size.rs index 9777553341..f206b299d6 100644 --- a/native/spark-expr/src/array_funcs/size.rs +++ b/native/spark-expr/src/array_funcs/size.rs @@ -21,7 +21,6 @@ use datafusion::common::{exec_err, DataFusionError, Result as DataFusionResult, use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark size() function that returns the size of arrays or maps. @@ -73,10 +72,6 @@ impl SparkSizeFunc { } impl ScalarUDFImpl for SparkSizeFunc { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "size" } diff --git a/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs b/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs index 3436b29201..3e9e5f00ad 100644 --- a/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs +++ b/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs @@ -17,7 +17,7 @@ use arrow::datatypes::{Field, FieldRef}; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::bloom_filter::spark_bloom_filter; use crate::bloom_filter::spark_bloom_filter::SparkBloomFilter; @@ -75,10 +75,6 @@ impl BloomFilterAgg { } impl AggregateUDFImpl for BloomFilterAgg { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "bloom_filter_agg" } diff --git a/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs b/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs index ea246dfb25..66168444d9 100644 --- a/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs +++ b/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs @@ -22,7 +22,6 @@ use datafusion::error::DataFusionError; use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; -use std::any::Any; use std::sync::Arc; use crate::bloom_filter::spark_bloom_filter::SparkBloomFilter; @@ -63,10 +62,6 @@ fn evaluate_bloom_filter( } impl ScalarUDFImpl for BloomFilterMightContain { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "might_contain" } diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 9c91bb69c9..ac59dafa76 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -34,7 +34,6 @@ use datafusion::logical_expr::{ Volatility, }; use datafusion::physical_plan::ColumnarValue; -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -269,10 +268,6 @@ impl CometScalarFunction { } impl ScalarUDFImpl for CometScalarFunction { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { self.name.as_str() } diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs b/native/spark-expr/src/datetime_funcs/date_diff.rs index ca148c103a..be3c1d3552 100644 --- a/native/spark-expr/src/datetime_funcs/date_diff.rs +++ b/native/spark-expr/src/datetime_funcs/date_diff.rs @@ -22,7 +22,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-compatible date_diff function. @@ -52,10 +51,6 @@ impl Default for SparkDateDiff { } impl ScalarUDFImpl for SparkDateDiff { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "date_diff" } diff --git a/native/spark-expr/src/datetime_funcs/date_trunc.rs b/native/spark-expr/src/datetime_funcs/date_trunc.rs index aeae18e36f..7ceb5234e1 100644 --- a/native/spark-expr/src/datetime_funcs/date_trunc.rs +++ b/native/spark-expr/src/datetime_funcs/date_trunc.rs @@ -22,7 +22,6 @@ use datafusion::common::{ use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use crate::kernels::temporal::{date_trunc_array_fmt_dyn, date_trunc_dyn}; @@ -51,10 +50,6 @@ impl Default for SparkDateTrunc { } impl ScalarUDFImpl for SparkDateTrunc { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "date_trunc" } diff --git a/native/spark-expr/src/datetime_funcs/extract_date_part.rs b/native/spark-expr/src/datetime_funcs/extract_date_part.rs index acb7d2266e..7344a3953a 100644 --- a/native/spark-expr/src/datetime_funcs/extract_date_part.rs +++ b/native/spark-expr/src/datetime_funcs/extract_date_part.rs @@ -22,7 +22,7 @@ use datafusion::common::{internal_datafusion_err, DataFusionError}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::{any::Any, fmt::Debug}; +use std::fmt::Debug; macro_rules! extract_date_part { ($struct_name:ident, $fn_name:expr, $date_part_variant:ident) => { @@ -44,10 +44,6 @@ macro_rules! extract_date_part { } impl ScalarUDFImpl for $struct_name { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { $fn_name } diff --git a/native/spark-expr/src/datetime_funcs/make_date.rs b/native/spark-expr/src/datetime_funcs/make_date.rs index 58e4108580..ef29431703 100644 --- a/native/spark-expr/src/datetime_funcs/make_date.rs +++ b/native/spark-expr/src/datetime_funcs/make_date.rs @@ -23,7 +23,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-compatible make_date function. @@ -75,10 +74,6 @@ fn make_date(year: i32, month: i32, day: i32) -> Option { } impl ScalarUDFImpl for SparkMakeDate { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "make_date" } diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index c4f1576293..4b7df90559 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -24,7 +24,7 @@ use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use num::integer::div_floor; -use std::{any::Any, fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; const MICROS_PER_SECOND: i64 = 1_000_000; @@ -46,10 +46,6 @@ impl SparkUnixTimestamp { } impl ScalarUDFImpl for SparkUnixTimestamp { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "unix_timestamp" } diff --git a/native/spark-expr/src/string_funcs/contains.rs b/native/spark-expr/src/string_funcs/contains.rs index bc34ce9cba..537227efdf 100644 --- a/native/spark-expr/src/string_funcs/contains.rs +++ b/native/spark-expr/src/string_funcs/contains.rs @@ -27,7 +27,6 @@ use datafusion::common::{exec_err, Result, ScalarValue}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-optimized contains function. @@ -53,10 +52,6 @@ impl SparkContains { } impl ScalarUDFImpl for SparkContains { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "contains" } From 8d922b5df0fa063fa17ed8c7c3697e8b6bb44164 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 8 Apr 2026 17:41:01 -0400 Subject: [PATCH 2/2] fix expandexec --- native/core/src/execution/operators/expand.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/operators/expand.rs b/native/core/src/execution/operators/expand.rs index c2c9c671d5..e2677d226d 100644 --- a/native/core/src/execution/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -95,16 +95,13 @@ impl ExecutionPlan for ExpandExec { &self, f: &mut dyn FnMut(&dyn PhysicalExpr) -> datafusion::common::Result, ) -> datafusion::common::Result { + let mut tnr = TreeNodeRecursion::Continue; for projection in &self.projections { for expr in projection { - match f(expr.as_ref())? { - TreeNodeRecursion::Continue => {} - TreeNodeRecursion::Jump => {} - TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), - } + tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; } } - Ok(TreeNodeRecursion::Continue) + Ok(tnr) } fn schema(&self) -> SchemaRef {