Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions datafusion/functions-aggregate/benches/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::hint::black_box;
use std::sync::Arc;

use arrow::array::{
ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray,
UInt8Array, UInt16Array,
ArrayRef, Float32Array, Float64Array, Int8Array, Int16Array, Int64Array, StringArray,
StringViewArray, UInt8Array, UInt16Array,
};
use arrow::datatypes::{DataType, Field, Schema};
use criterion::{Criterion, criterion_group, criterion_main};
Expand Down Expand Up @@ -61,6 +61,24 @@ fn prepare_accumulator(data_type: DataType) -> Box<dyn Accumulator> {
ApproxDistinct::new().accumulator(accumulator_args).unwrap()
}

/// Creates a `Float32Array` values from a pool of `n_distinct` floats.
fn create_f32_array(n_distinct: usize) -> Float32Array {
let mut rng = StdRng::seed_from_u64(42);
let pool: Vec<f32> = (0..n_distinct).map(|i| i as f32 * 0.5).collect();
(0..BATCH_SIZE)
.map(|_| Some(pool[rng.random_range(0..pool.len())]))
.collect()
}

/// Creates a `Float64Array` values from a pool of `n_distinct` floats.
fn create_f64_array(n_distinct: usize) -> Float64Array {
let mut rng = StdRng::seed_from_u64(42);
let pool: Vec<f64> = (0..n_distinct).map(|i| i as f64 * 0.5).collect();
(0..BATCH_SIZE)
.map(|_| Some(pool[rng.random_range(0..pool.len())]))
.collect()
}

/// Creates an Int64Array where values are drawn from `0..n_distinct`.
fn create_i64_array(n_distinct: usize) -> Int64Array {
let mut rng = StdRng::seed_from_u64(42);
Expand Down Expand Up @@ -224,6 +242,28 @@ fn approx_distinct_benchmark(c: &mut Criterion) {
.unwrap()
})
});

// Float32
let values = Arc::new(create_f32_array(200)) as ArrayRef;
c.bench_function("approx_distinct f32", |b| {
b.iter(|| {
let mut accumulator = prepare_accumulator(DataType::Float32);
accumulator
.update_batch(std::slice::from_ref(&values))
.unwrap()
})
});

// Float64
let values = Arc::new(create_f64_array(200)) as ArrayRef;
c.bench_function("approx_distinct f64", |b| {
b.iter(|| {
let mut accumulator = prepare_accumulator(DataType::Float64);
accumulator
.update_batch(std::slice::from_ref(&values))
.unwrap()
})
});
}

/// Build a `GroupsAccumulator` the same way the aggregate operator does: use the
Expand Down Expand Up @@ -287,6 +327,16 @@ fn build_grouped_batches(data_type: &DataType) -> Vec<(ArrayRef, Vec<usize>)> {
.map(|_| Some(pool[rng.random_range(0..pool.len())].as_str()))
.collect::<StringViewArray>(),
),
DataType::Float32 => Arc::new(
(0..BATCH_SIZE)
.map(|_| Some(rng.random::<f32>()))
.collect::<Float32Array>(),
),
DataType::Float64 => Arc::new(
(0..BATCH_SIZE)
.map(|_| Some(rng.random::<f64>()))
.collect::<Float64Array>(),
),
other => panic!("unsupported grouped bench type: {other}"),
};
(values, group_indices)
Expand All @@ -300,7 +350,13 @@ fn approx_distinct_grouped_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("approx_distinct_grouped");
group.sample_size(10);

for data_type in [DataType::Int64, DataType::Utf8, DataType::Utf8View] {
for data_type in [
DataType::Int64,
DataType::Utf8,
DataType::Utf8View,
DataType::Float32,
DataType::Float64,
] {
let batches = build_grouped_batches(&data_type);
let label = format!("{data_type:?} {N_GROUPS} groups");
group.bench_function(&label, |b| {
Expand Down
73 changes: 71 additions & 2 deletions datafusion/functions-aggregate/src/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,9 @@ impl AggregateUDFImpl for ApproxDistinct {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Box::new(NumericHLLAccumulator::<TimestampNanosecondType>::new())
}
DataType::Utf8
DataType::Float32
| DataType::Float64
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Utf8View
| DataType::Binary
Expand Down Expand Up @@ -818,6 +820,8 @@ fn is_hll_groups_type(data_type: &DataType) -> bool {
| DataType::Timestamp(TimeUnit::Millisecond, _)
| DataType::Timestamp(TimeUnit::Microsecond, _)
| DataType::Timestamp(TimeUnit::Nanosecond, _)
| DataType::Float32
| DataType::Float64
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Utf8View
Expand All @@ -834,7 +838,9 @@ mod tests {
#[cfg(not(feature = "force_hash_collisions"))]
mod real_hash_test {
use super::*;
use arrow::array::{AsArray, Int64Array, StringViewArray};
use arrow::array::{
AsArray, Float32Array, Float64Array, Int64Array, StringViewArray,
};
use std::sync::Arc;
// A string longer than the 12-byte inline limit
const LONG: &str = "this string is definitely longer than twelve bytes";
Expand All @@ -846,6 +852,69 @@ mod tests {
}
}

fn groups_count(values: ArrayRef) -> u64 {
let group_indices = vec![0usize; values.len()];
let mut acc = HllGroupsAccumulator::new();
acc.update_batch(&[values], &group_indices, None, 1)
.unwrap();
let result = acc.evaluate(EmitTo::All).unwrap();
result
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(0)
}

#[test]
fn float_support_for_hll_accumulator_and_group_accumulator() {
let floats_32: ArrayRef = Arc::new(Float32Array::from(vec![
1.0,
2.0,
2.0,
3.5,
3.5,
3.5,
-0.0,
0.0,
f32::NAN,
f32::NAN,
]));

let floats_64: ArrayRef = Arc::new(Float64Array::from(vec![
1.0,
2.0,
2.0,
3.5,
3.5,
3.5,
-0.0,
0.0,
f64::NAN,
f64::NAN,
]));

for array in [floats_32, floats_64] {
assert!(
is_hll_groups_type(array.data_type()),
"{} should be groups-capable",
array.data_type()
);

let mut acc = HLLAccumulator::new();
acc.update_batch(&[Arc::clone(&array)]).unwrap();
let per_group_count = distinct_count(&mut acc);
let groups_count = groups_count(Arc::clone(&array));

assert_eq!(
per_group_count,
groups_count,
"paths disagree for {}",
array.data_type()
);
assert_eq!(per_group_count, 5, "wrong count for {}", array.data_type());
}
}

/// `approx_distinct(v) FILTER (WHERE nullable_bool)` — a NULL filter row
/// must not be counted (null filter is treated the same as false).
#[test]
Expand Down
29 changes: 29 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,35 @@ true
statement ok
DROP TABLE approx_distinct_dense_test;

# This test runs approx_distinct over float32 and float64 for the scalar and the grouped path.
statement ok
CREATE TABLE approx_distinct_typed_test (g INT, f32 FLOAT, f64 DOUBLE) AS VALUES
(1, 1.0, 1.0),
(1, 2.0, 2.0),
(1, 2.0, 2.0),
(2, 3.5, 3.5),
(2, -0.0, -0.0),
(2, 0.0, 0.0);

# Scalar path
query II
SELECT approx_distinct(f32), approx_distinct(f64) FROM approx_distinct_typed_test;
----
4 4

# Grouped path
query III
SELECT g, approx_distinct(f32), approx_distinct(f64)
FROM approx_distinct_typed_test GROUP BY g ORDER BY g;
----
1 2 2
2 2 2

statement ok
DROP TABLE approx_distinct_typed_test;



## This test executes the APPROX_PERCENTILE_CONT aggregation against the test
## data, asserting the estimated quantiles are ±5% their actual values.
##
Expand Down
Loading