From d5f203cbc7198991ece32fa3ff263c3bf5630e10 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Mon, 22 Jun 2026 12:00:48 +0200 Subject: [PATCH 1/2] feat: Support float 32 and float 64 in approx_distinct --- .../benches/approx_distinct.rs | 62 +++++++++++++++- .../src/approx_distinct.rs | 73 ++++++++++++++++++- .../sqllogictest/test_files/aggregate.slt | 31 +++++++- 3 files changed, 160 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 44b45431e3eb1..f2fcf3d9231f5 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -19,8 +19,8 @@ use std::hint::black_box; use std::sync::Arc; use arrow::array::{ - ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray, - UInt8Array, UInt16Array, + ArrayRef, Float32Array, Float64Array, Int8Array, Int16Array, Int64Array, StringArray, + StringViewArray, UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -61,6 +61,24 @@ fn prepare_accumulator(data_type: DataType) -> Box { ApproxDistinct::new().accumulator(accumulator_args).unwrap() } +/// Creates a `Float32Array` values from a pool of `n_distinct` floats. +fn create_f32_array(n_distinct: usize) -> Float32Array { + let mut rng = StdRng::seed_from_u64(42); + let pool: Vec = (0..n_distinct).map(|i| i as f32 * 0.5).collect(); + (0..BATCH_SIZE) + .map(|_| Some(pool[rng.random_range(0..pool.len())])) + .collect() +} + +/// Creates a `Float64Array` values from a pool of `n_distinct` floats. +fn create_f64_array(n_distinct: usize) -> Float64Array { + let mut rng = StdRng::seed_from_u64(42); + let pool: Vec = (0..n_distinct).map(|i| i as f64 * 0.5).collect(); + (0..BATCH_SIZE) + .map(|_| Some(pool[rng.random_range(0..pool.len())])) + .collect() +} + /// Creates an Int64Array where values are drawn from `0..n_distinct`. fn create_i64_array(n_distinct: usize) -> Int64Array { let mut rng = StdRng::seed_from_u64(42); @@ -224,6 +242,28 @@ fn approx_distinct_benchmark(c: &mut Criterion) { .unwrap() }) }); + + // Float32 + let values = Arc::new(create_f32_array(200)) as ArrayRef; + c.bench_function("approx_distinct f32", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Float32); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // Float64 + let values = Arc::new(create_f64_array(200)) as ArrayRef; + c.bench_function("approx_distinct f64", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Float64); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); } /// Build a `GroupsAccumulator` the same way the aggregate operator does: use the @@ -287,6 +327,16 @@ fn build_grouped_batches(data_type: &DataType) -> Vec<(ArrayRef, Vec)> { .map(|_| Some(pool[rng.random_range(0..pool.len())].as_str())) .collect::(), ), + DataType::Float32 => Arc::new( + (0..BATCH_SIZE) + .map(|_| Some(rng.random::())) + .collect::(), + ), + DataType::Float64 => Arc::new( + (0..BATCH_SIZE) + .map(|_| Some(rng.random::())) + .collect::(), + ), other => panic!("unsupported grouped bench type: {other}"), }; (values, group_indices) @@ -300,7 +350,13 @@ fn approx_distinct_grouped_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("approx_distinct_grouped"); group.sample_size(10); - for data_type in [DataType::Int64, DataType::Utf8, DataType::Utf8View] { + for data_type in [ + DataType::Int64, + DataType::Utf8, + DataType::Utf8View, + DataType::Float32, + DataType::Float64, + ] { let batches = build_grouped_batches(&data_type); let label = format!("{data_type:?} {N_GROUPS} groups"); group.bench_function(&label, |b| { diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 90cc8d0630af7..72aa820347ce8 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -758,7 +758,9 @@ impl AggregateUDFImpl for ApproxDistinct { DataType::Timestamp(TimeUnit::Nanosecond, _) => { Box::new(NumericHLLAccumulator::::new()) } - DataType::Utf8 + DataType::Float32 + | DataType::Float64 + | DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View | DataType::Binary @@ -818,6 +820,8 @@ fn is_hll_groups_type(data_type: &DataType) -> bool { | DataType::Timestamp(TimeUnit::Millisecond, _) | DataType::Timestamp(TimeUnit::Microsecond, _) | DataType::Timestamp(TimeUnit::Nanosecond, _) + | DataType::Float32 + | DataType::Float64 | DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View @@ -834,7 +838,9 @@ mod tests { #[cfg(not(feature = "force_hash_collisions"))] mod real_hash_test { use super::*; - use arrow::array::{AsArray, Int64Array, StringViewArray}; + use arrow::array::{ + AsArray, Float32Array, Float64Array, Int64Array, StringViewArray, + }; use std::sync::Arc; // A string longer than the 12-byte inline limit const LONG: &str = "this string is definitely longer than twelve bytes"; @@ -846,6 +852,69 @@ mod tests { } } + fn groups_count(values: ArrayRef) -> u64 { + let group_indices = vec![0usize; values.len()]; + let mut acc = HllGroupsAccumulator::new(); + acc.update_batch(&[values], &group_indices, None, 1) + .unwrap(); + let result = acc.evaluate(EmitTo::All).unwrap(); + result + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + } + + #[test] + fn float_support_for_hll_accumulator_and_group_accumulator() { + let floats_32: ArrayRef = Arc::new(Float32Array::from(vec![ + 1.0, + 2.0, + 2.0, + 3.5, + 3.5, + 3.5, + -0.0, + 0.0, + f32::NAN, + f32::NAN, + ])); + + let floats_64: ArrayRef = Arc::new(Float64Array::from(vec![ + 1.0, + 2.0, + 2.0, + 3.5, + 3.5, + 3.5, + -0.0, + 0.0, + f64::NAN, + f64::NAN, + ])); + + for array in [floats_32, floats_64] { + assert!( + is_hll_groups_type(array.data_type()), + "{} should be groups-capable", + array.data_type() + ); + + let mut acc = HLLAccumulator::new(); + acc.update_batch(&[Arc::clone(&array)]).unwrap(); + let per_group_count = distinct_count(&mut acc); + let groups_count = groups_count(Arc::clone(&array)); + + assert_eq!( + per_group_count, + groups_count, + "paths disagree for {}", + array.data_type() + ); + assert_eq!(per_group_count, 5, "wrong count for {}", array.data_type()); + } + } + /// `approx_distinct(v) FILTER (WHERE nullable_bool)` — a NULL filter row /// must not be counted (null filter is treated the same as false). #[test] diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 18c09acf08887..3bfb1c23ba181 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1978,7 +1978,36 @@ true statement ok DROP TABLE approx_distinct_dense_test; -## This test executes the APPROX_PERCENTILE_CONT aggregation against the test +# This test runs approx_distinct over float32 and float64 for the scalar and the grouped path. +statement ok +CREATE TABLE approx_distinct_typed_test (g INT, f32 FLOAT, f64 DOUBLE) AS VALUES + (1, 1.0, 1.0), + (1, 2.0, 2.0), + (1, 2.0, 2.0), + (2, 3.5, 3.5), + (2, -0.0, -0.0), + (2, 0.0, 0.0); + +# Scalar path +query II +SELECT approx_distinct(f32), approx_distinct(f64) FROM approx_distinct_typed_test; +---- +4 4 + +# Grouped path +query III +SELECT g, approx_distinct(f32), approx_distinct(f64) +FROM approx_distinct_typed_test GROUP BY g ORDER BY g; +---- +1 2 2 +2 2 2 + +statement ok +DROP TABLE approx_distinct_typed_test; + + + +## This test exectes the APPROX_PERCENTILE_CONT aggregation against the test ## data, asserting the estimated quantiles are ±5% their actual values. ## ## Actual quantiles calculated with: From ec400a728417bc5ef6fc5e6bba1bf28757f6bb84 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Tue, 23 Jun 2026 13:48:03 +0200 Subject: [PATCH 2/2] fix typo --- datafusion/sqllogictest/test_files/aggregate.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 3bfb1c23ba181..514431b4c43e5 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2007,7 +2007,7 @@ DROP TABLE approx_distinct_typed_test; -## This test exectes the APPROX_PERCENTILE_CONT aggregation against the test +## This test executes the APPROX_PERCENTILE_CONT aggregation against the test ## data, asserting the estimated quantiles are ±5% their actual values. ## ## Actual quantiles calculated with: