Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ AggregateFunctionSet BitAndFun::GetFunctions() {
for (auto &type : LogicalType::Integral()) {
bit_and.AddFunction(GetBitfieldUnaryAggregate<BitAndOperation>(type));
}
bit_and.AddFunction(
AggregateFunction::UnaryAggregateDestructor<BitStringState, string_t, string_t, BitStringAndOperation>(
LogicalType::BIT, LogicalType::BIT));
bit_and.AddFunction(AggregateFunction::UnaryAggregate<BitStringState, string_t, string_t, BitStringAndOperation>(
LogicalType::BIT, LogicalType::BIT));
return bit_and;
}

Expand All @@ -238,9 +237,8 @@ AggregateFunctionSet BitOrFun::GetFunctions() {
for (auto &type : LogicalType::Integral()) {
bit_or.AddFunction(GetBitfieldUnaryAggregate<BitOrOperation>(type));
}
bit_or.AddFunction(
AggregateFunction::UnaryAggregateDestructor<BitStringState, string_t, string_t, BitStringOrOperation>(
LogicalType::BIT, LogicalType::BIT));
bit_or.AddFunction(AggregateFunction::UnaryAggregate<BitStringState, string_t, string_t, BitStringOrOperation>(
LogicalType::BIT, LogicalType::BIT));
return bit_or;
}

Expand All @@ -249,9 +247,8 @@ AggregateFunctionSet BitXorFun::GetFunctions() {
for (auto &type : LogicalType::Integral()) {
bit_xor.AddFunction(GetBitfieldUnaryAggregate<BitXorOperation>(type));
}
bit_xor.AddFunction(
AggregateFunction::UnaryAggregateDestructor<BitStringState, string_t, string_t, BitStringXorOperation>(
LogicalType::BIT, LogicalType::BIT));
bit_xor.AddFunction(AggregateFunction::UnaryAggregate<BitStringState, string_t, string_t, BitStringXorOperation>(
LogicalType::BIT, LogicalType::BIT));
return bit_xor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,8 @@ unique_ptr<FunctionData> BindBitstringAgg(BindAggregateFunctionInput &input) {

template <class TYPE>
void BindBitString(AggregateFunctionSet &bitstring_agg, const LogicalTypeId &type) {
auto function =
AggregateFunction::UnaryAggregateDestructor<BitAggState<TYPE>, TYPE, string_t, BitStringAggOperation>(
type, LogicalType::BIT);
auto function = AggregateFunction::UnaryAggregate<BitAggState<TYPE>, TYPE, string_t, BitStringAggOperation>(
type, LogicalType::BIT);
function.SetBindCallback(BindBitstringAgg); // create new a 'BitstringAggBindData'
function.SetSerializeCallback(BitstringAggBindData::Serialize);
function.SetDeserializeCallback(BitstringAggBindData::Deserialize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace duckdb {
namespace {

struct StringAggState {
using STATE_TYPE = OptionalStateType<StateString<StateReturnType>>;

string_t value;
bool is_set;
uint32_t alloc_size;
Expand Down Expand Up @@ -158,6 +160,20 @@ unique_ptr<FunctionData> StringAggDeserialize(Deserializer &deserializer, BoundA
return make_uniq<StringAggBindData>(std::move(sep));
}

AggregateStateLayout StringAggStateType(AggregateLayoutInput &input) {
auto &function = input.function;
using ST = StringAggState::STATE_TYPE;
AggregateStateLayout layout;
layout.type = AggregateFunction::BuildStateLogical<ST, StringAggState>(function);
layout.total_state_size = AlignValue<idx_t>(sizeof(StringAggState));
layout.field = BuildStateField<ST>();
if (function.GetOriginalArguments().size() == 2) {
// record the value of the separator if explicitly provided
layout.constant_parameters.emplace(1, Value(input.bind_data->Cast<StringAggBindData>().sep));
}
return layout;
}

} // namespace

AggregateFunctionSet StringAggFun::GetFunctions() {
Expand All @@ -172,6 +188,7 @@ AggregateFunctionSet StringAggFun::GetFunctions() {
FunctionNullHandling::DEFAULT_NULL_HANDLING, AggregateFunction::NoClusterUpdate(), StringAggBind);
string_agg_param.SetSerializeCallback(StringAggSerialize);
string_agg_param.SetDeserializeCallback(StringAggDeserialize);
string_agg_param.SetStructStateExport(StringAggStateType);
string_agg.AddFunction(string_agg_param);
string_agg_param.GetSignature().AddParameter(LogicalType::VARCHAR);
string_agg.AddFunction(string_agg_param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ void ApproxTopKUpdate(Vector inputs[], AggregateInputData &aggr_input, idx_t inp
}

template <class OP = HistogramGenericFunctor>
void ApproxTopKFinalize(Vector &state_vector, AggregateInputData &, Vector &result, idx_t count, idx_t offset) {
void ApproxTopKFinalize(Vector &state_vector, AggregateFinalizeInputData &, Vector &result, idx_t count, idx_t offset) {
auto states = state_vector.Values<ApproxTopKState *>();

auto old_len = ListVector::GetListSize(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,31 @@ struct ApproxQuantileScalarOperation : public ApproxQuantileOperation {
AggregateFunction GetApproximateQuantileAggregateFunction(const LogicalType &type) {
// Not binary comparable
if (type == LogicalType::TIME_TZ) {
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, dtime_tz_t, dtime_tz_t,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, dtime_tz_t, dtime_tz_t,
ApproxQuantileScalarOperation>(type, type);
}
switch (type.InternalType()) {
case PhysicalType::INT8:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, int8_t, int8_t,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, int8_t, int8_t, ApproxQuantileScalarOperation>(
type, type);
case PhysicalType::INT16:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, int16_t, int16_t,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, int16_t, int16_t, ApproxQuantileScalarOperation>(
type, type);
case PhysicalType::INT32:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, int32_t, int32_t,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, int32_t, int32_t, ApproxQuantileScalarOperation>(
type, type);
case PhysicalType::INT64:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, int64_t, int64_t,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, int64_t, int64_t, ApproxQuantileScalarOperation>(
type, type);
case PhysicalType::INT128:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, hugeint_t, hugeint_t,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, hugeint_t, hugeint_t,
ApproxQuantileScalarOperation>(type, type);
case PhysicalType::FLOAT:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, float, float,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, float, float, ApproxQuantileScalarOperation>(
type, type);
case PhysicalType::DOUBLE:
return AggregateFunction::UnaryAggregateDestructor<ApproxQuantileState, double, double,
ApproxQuantileScalarOperation>(type, type);
return AggregateFunction::UnaryAggregate<ApproxQuantileState, double, double, ApproxQuantileScalarOperation>(
type, type);
default:
throw InternalException("Unimplemented quantile aggregate");
}
Expand Down
10 changes: 4 additions & 6 deletions src/duckdb/extension/core_functions/aggregate/holistic/mode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,8 @@ template <typename INPUT_TYPE, typename TYPE_OP = ModeStandard<INPUT_TYPE>>
AggregateFunction GetTypedModeFunction(const LogicalType &type) {
using STATE = ModeState<INPUT_TYPE, TYPE_OP>;
using OP = ModeFunction<TYPE_OP>;
auto func =
AggregateFunction::UnaryAggregateDestructor<STATE, INPUT_TYPE, INPUT_TYPE, OP, AggregateDestructorType::LEGACY>(
type, type);
auto func = AggregateFunction::UnaryAggregate<STATE, INPUT_TYPE, INPUT_TYPE, OP, AggregateDestructorType::LEGACY>(
type, type);
func.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, INPUT_TYPE>);
return func;
}
Expand Down Expand Up @@ -563,9 +562,8 @@ template <typename INPUT_TYPE, typename TYPE_OP = ModeStandard<INPUT_TYPE>>
AggregateFunction GetTypedEntropyFunction(const LogicalType &type) {
using STATE = ModeState<INPUT_TYPE, TYPE_OP>;
using OP = EntropyFunction<TYPE_OP>;
auto func =
AggregateFunction::UnaryAggregateDestructor<STATE, INPUT_TYPE, double, OP, AggregateDestructorType::LEGACY>(
type, LogicalType::DOUBLE);
auto func = AggregateFunction::UnaryAggregate<STATE, INPUT_TYPE, double, OP, AggregateDestructorType::LEGACY>(
type, LogicalType::DOUBLE);
func.SetNullHandling(FunctionNullHandling::SPECIAL_HANDLING);
return func;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ struct ScalarDiscreteQuantile {
QuantileSortKeyUpdate, ListCombineFunction<OP>,
AggregateFunction::StateVoidFinalize<STATE, OP>, nullptr, nullptr,
AggregateFunction::StateDestroy<STATE, OP>);
fun.SetInitLocalStateFinalizeCallback(FlattenedQuantileValues<string_t>::Init);
return fun;
}
};
Expand Down Expand Up @@ -468,6 +469,7 @@ struct ListDiscreteQuantile {
QuantileSortKeyUpdate, ListCombineFunction<OP>,
AggregateFunction::StateFinalize<STATE, list_entry_t, OP>, nullptr, nullptr,
AggregateFunction::StateDestroy<STATE, OP>);
fun.SetInitLocalStateFinalizeCallback(FlattenedQuantileValues<string_t>::Init);
return fun;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,37 +167,37 @@ struct ReservoirQuantileScalarOperation : public ReservoirQuantileOperation {
AggregateFunction GetReservoirQuantileAggregateFunction(PhysicalType type) {
switch (type) {
case PhysicalType::INT8:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<int8_t>, int8_t, int8_t,
ReservoirQuantileScalarOperation>(LogicalType::TINYINT,
LogicalType::TINYINT);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<int8_t>, int8_t, int8_t,
ReservoirQuantileScalarOperation>(LogicalType::TINYINT,
LogicalType::TINYINT);

case PhysicalType::INT16:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<int16_t>, int16_t, int16_t,
ReservoirQuantileScalarOperation>(LogicalType::SMALLINT,
LogicalType::SMALLINT);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<int16_t>, int16_t, int16_t,
ReservoirQuantileScalarOperation>(LogicalType::SMALLINT,
LogicalType::SMALLINT);

case PhysicalType::INT32:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<int32_t>, int32_t, int32_t,
ReservoirQuantileScalarOperation>(LogicalType::INTEGER,
LogicalType::INTEGER);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<int32_t>, int32_t, int32_t,
ReservoirQuantileScalarOperation>(LogicalType::INTEGER,
LogicalType::INTEGER);

case PhysicalType::INT64:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<int64_t>, int64_t, int64_t,
ReservoirQuantileScalarOperation>(LogicalType::BIGINT,
LogicalType::BIGINT);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<int64_t>, int64_t, int64_t,
ReservoirQuantileScalarOperation>(LogicalType::BIGINT,
LogicalType::BIGINT);

case PhysicalType::INT128:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<hugeint_t>, hugeint_t, hugeint_t,
ReservoirQuantileScalarOperation>(LogicalType::HUGEINT,
LogicalType::HUGEINT);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<hugeint_t>, hugeint_t, hugeint_t,
ReservoirQuantileScalarOperation>(LogicalType::HUGEINT,
LogicalType::HUGEINT);
case PhysicalType::FLOAT:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<float>, float, float,
ReservoirQuantileScalarOperation>(LogicalType::FLOAT,
LogicalType::FLOAT);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<float>, float, float,
ReservoirQuantileScalarOperation>(LogicalType::FLOAT,
LogicalType::FLOAT);
case PhysicalType::DOUBLE:
return AggregateFunction::UnaryAggregateDestructor<ReservoirQuantileState<double>, double, double,
ReservoirQuantileScalarOperation>(LogicalType::DOUBLE,
LogicalType::DOUBLE);
return AggregateFunction::UnaryAggregate<ReservoirQuantileState<double>, double, double,
ReservoirQuantileScalarOperation>(LogicalType::DOUBLE,
LogicalType::DOUBLE);
default:
throw InternalException("Unimplemented reservoir quantile aggregate");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ void IsHistogramOtherBinFunction(DataChunk &args, ExpressionState &state, Vector
}

template <class OP, class T>
void HistogramBinFinalizeFunction(Vector &state_vector, AggregateInputData &, Vector &result, idx_t count,
void HistogramBinFinalizeFunction(Vector &state_vector, AggregateFinalizeInputData &, Vector &result, idx_t count,
idx_t offset) {
auto states = state_vector.Values<HistogramBinState<T> *>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ void HistogramUpdateFunction(Vector inputs[], AggregateInputData &aggr_input, id
}

template <class OP, class T, class MAP_TYPE>
void HistogramFinalizeFunction(Vector &state_vector, AggregateInputData &, Vector &result, idx_t count, idx_t offset) {
void HistogramFinalizeFunction(Vector &state_vector, AggregateFinalizeInputData &, Vector &result, idx_t count,
idx_t offset) {
using HIST_STATE = HistogramAggState<T, typename MAP_TYPE::MAP_TYPE>;

auto states = state_vector.Values<HIST_STATE *>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace duckdb {

namespace {

void ListFinalize(Vector &states_vector, AggregateInputData &aggr_input_data, Vector &result, idx_t count,
void ListFinalize(Vector &states_vector, AggregateFinalizeInputData &aggr_input_data, Vector &result, idx_t count,
idx_t offset) {
auto states = states_vector.Values<ListAggState *>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@ namespace duckdb {

//! Flattens the values of a linked list into a contiguous array for interpolation.
//! The flattened values are mutable - the interpolators partially sort them in place.
//! The flattened chunk is cached in the finalize data's local state, so that it is allocated (at most) once per
//! result chunk instead of once per finalized group.
//! The flattened chunk lives in the finalize's local state, so that it is allocated (at most) once per
//! finalize call instead of once per finalized group - callers that keep the local state alive re-use it
//! across finalize calls.
template <class INPUT_TYPE>
struct FlattenedQuantileValues : FunctionLocalState {
FlattenedQuantileValues(const LogicalType &type, idx_t capacity_p) : capacity(capacity_p) {
chunk.Initialize(Allocator::DefaultAllocator(), {type}, capacity_p);
FlattenedQuantileValues() : capacity(0) {
}

//! Flatten the values of the given linked list into the chunk cached in the finalize data
static unique_ptr<FunctionLocalState> Init(const BoundAggregateFunction &, optional_ptr<FunctionData>) {
return make_uniq<FlattenedQuantileValues>();
}

//! Flatten the values of the given linked list into the chunk cached in the finalize local state
static FlattenedQuantileValues &Flatten(AggregateFinalizeData &finalize_data, const LinkedList &linked_list) {
const auto type = PrimitiveToLogicalType<INPUT_TYPE>();
const auto required_capacity = MaxValue<idx_t>(linked_list.total_capacity, 1);
if (!finalize_data.local_state) {
finalize_data.local_state = make_uniq<FlattenedQuantileValues>(type, NextPowerOfTwo(required_capacity));
}
auto &values = finalize_data.local_state->Cast<FlattenedQuantileValues>();
D_ASSERT(finalize_data.input.local_state);
auto &values = finalize_data.input.local_state->Cast<FlattenedQuantileValues>();
if (values.capacity < required_capacity) {
// grow the cached chunk
// (re-)allocate the cached chunk
values.capacity = NextPowerOfTwo(required_capacity);
values.chunk.Destroy();
values.chunk.Initialize(Allocator::DefaultAllocator(), {type}, values.capacity);
Expand Down Expand Up @@ -125,12 +127,14 @@ struct QuantileOperation {
//! Quantiles ignore NULL values, so they are filtered out while appending.
template <class STATE, class RESULT_TYPE, class OP>
AggregateFunction QuantileBufferingAggregate(const LogicalType &input_type, const LogicalType &result_type) {
return AggregateFunction({input_type}, result_type, AggregateFunction::StateSize<STATE>,
AggregateFunction::StateInitialize<STATE, OP, AggregateDestructorType::LEGACY>,
ListUpdateFunction<true>, ListCombineFunction<OP>,
AggregateFunction::StateFinalize<STATE, RESULT_TYPE, OP>,
FunctionNullHandling::DEFAULT_NULL_HANDLING, AggregateFunction::NoClusterUpdate(),
AggregateFunction::NoBind(), AggregateFunction::StateDestroy<STATE, OP>);
AggregateFunction fun({input_type}, result_type, AggregateFunction::StateSize<STATE>,
AggregateFunction::StateInitialize<STATE, OP, AggregateDestructorType::LEGACY>,
ListUpdateFunction<true>, ListCombineFunction<OP>,
AggregateFunction::StateFinalize<STATE, RESULT_TYPE, OP>,
FunctionNullHandling::DEFAULT_NULL_HANDLING, AggregateFunction::NoClusterUpdate(),
AggregateFunction::NoBind(), AggregateFunction::StateDestroy<STATE, OP>);
fun.SetInitLocalStateFinalizeCallback(FlattenedQuantileValues<typename STATE::InputType>::Init);
return fun;
}

template <class T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void ListAggregatesFunction(DataChunk &args, ExpressionState &state, Vector &res
auto &aggr = info.aggr_expr->Cast<BoundAggregateExpression>();
auto &allocator = ExecuteFunctionState::GetFunctionState(state)->Cast<ListAggregatesLocalState>().arena_allocator;
allocator.Reset();
AggregateInputData aggr_input_data(aggr, allocator);
AggregateFinalizeInputData aggr_input_data(aggr, allocator);

D_ASSERT(aggr.Function().HasStateUpdateCallback());

Expand Down
Loading
Loading