Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ AggregateFunction GetTypedMedianAbsoluteDeviationAggregateFunction(const Logical
using OP = MedianAbsoluteDeviationOperation<MEDIAN_TYPE>;
auto fun = QuantileBufferingAggregate<STATE, TARGET_TYPE, OP>(input_type, target_type);
fun.SetBindCallback(BindMAD);
fun.SetStructStateExport(QuantileStateLayout<STATE>);
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, TARGET_TYPE>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ struct ScalarDiscreteQuantile {
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileScalarOperation<true>;
auto fun = QuantileBufferingAggregate<STATE, INPUT_TYPE, OP>(type, type);
fun.SetStructStateExport(QuantileStateLayout<STATE>);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::Window<STATE, INPUT_TYPE, INPUT_TYPE>);
fun.SetWindowInitCallback(OP::WindowInit<STATE, INPUT_TYPE>);
Expand All @@ -442,6 +443,7 @@ struct ScalarDiscreteQuantile {
AggregateFunction::StateVoidFinalize<STATE, OP>, nullptr, nullptr,
AggregateFunction::StateDestroy<STATE, OP>);
fun.SetInitLocalStateFinalizeCallback(FlattenedQuantileValues<string_t>::Init);
fun.SetStructStateExport(QuantileStateLayout<STATE, StateListType<StateSortKey<StateInputType<0>>>>);
return fun;
}
};
Expand All @@ -452,6 +454,7 @@ struct ListDiscreteQuantile {
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileListOperation<INPUT_TYPE, true>;
auto fun = QuantileBufferingAggregate<STATE, list_entry_t, OP>(type, LogicalType::LIST(type));
fun.SetStructStateExport(QuantileStateLayout<STATE>);
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, list_entry_t>);
Expand All @@ -470,6 +473,7 @@ struct ListDiscreteQuantile {
AggregateFunction::StateFinalize<STATE, list_entry_t, OP>, nullptr, nullptr,
AggregateFunction::StateDestroy<STATE, OP>);
fun.SetInitLocalStateFinalizeCallback(FlattenedQuantileValues<string_t>::Init);
fun.SetStructStateExport(QuantileStateLayout<STATE, StateListType<StateSortKey<StateInputType<0>>>>);
return fun;
}
};
Expand Down Expand Up @@ -544,6 +548,7 @@ struct ScalarContinuousQuantile {
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileScalarOperation<false>;
auto fun = QuantileBufferingAggregate<STATE, TARGET_TYPE, OP>(input_type, target_type);
fun.SetStructStateExport(QuantileStateLayout<STATE>);
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, TARGET_TYPE>);
Expand All @@ -559,6 +564,7 @@ struct ListContinuousQuantile {
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileListOperation<TARGET_TYPE, false>;
auto fun = QuantileBufferingAggregate<STATE, list_entry_t, OP>(input_type, LogicalType::LIST(target_type));
fun.SetStructStateExport(QuantileStateLayout<STATE>);
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, list_entry_t>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#pragma once

#include "core_functions/aggregate/quantile_sort_tree.hpp"
#include "duckdb/common/operator/negate.hpp"
#include "duckdb/common/types/data_chunk.hpp"
#include "duckdb/common/types/list_segment.hpp"
#include "duckdb/function/aggregate_function.hpp"
#include "duckdb/function/aggregate/list_aggregate.hpp"
#include "SkipList.h"

Expand Down Expand Up @@ -326,4 +328,82 @@ struct QuantileState : ListAggState {
}
};

//===--------------------------------------------------------------------===//
// Quantile State Export
//===--------------------------------------------------------------------===//
template <typename T>
inline T QuantileNeg(const T &t) {
return NegateOperator::Operation<T, T>(t);
}

//! Restores the sign of a normalized quantile parameter (see QuantileAbs)
template <>
inline Value QuantileNeg(const Value &v) {
const auto &type = v.type();
switch (type.id()) {
case LogicalTypeId::DECIMAL: {
const auto integral = IntegralValue::Get(v);
const auto width = DecimalType::GetWidth(type);
const auto scale = DecimalType::GetScale(type);
switch (type.InternalType()) {
case PhysicalType::INT16:
return Value::DECIMAL(QuantileNeg<int16_t>(Cast::Operation<hugeint_t, int16_t>(integral)), width, scale);
case PhysicalType::INT32:
return Value::DECIMAL(QuantileNeg<int32_t>(Cast::Operation<hugeint_t, int32_t>(integral)), width, scale);
case PhysicalType::INT64:
return Value::DECIMAL(QuantileNeg<int64_t>(Cast::Operation<hugeint_t, int64_t>(integral)), width, scale);
case PhysicalType::INT128:
return Value::DECIMAL(QuantileNeg<hugeint_t>(integral), width, scale);
default:
throw InternalException("Unknown DECIMAL type");
}
}
default:
return Value::DOUBLE(QuantileNeg<double>(v.GetValue<double>()));
}
}

//! Reconstructs the quantile parameter (e.g. 0.5 or [0.25, 0.75]) from the bind data, so that it can be recorded
//! in the AGGREGATE_STATE type - param_type is the declared type of the (erased) parameter argument
inline Value QuantileParameterValue(const QuantileBindData &bind_data, const LogicalType &param_type) {
vector<Value> quantiles;
for (auto &q : bind_data.quantiles) {
// the bind data holds the normalized (absolute) quantiles - restore the sign of descending quantiles
quantiles.push_back(bind_data.desc ? QuantileNeg(q.val) : q.val);
}
if (param_type.id() != LogicalTypeId::LIST && param_type.id() != LogicalTypeId::ARRAY) {
D_ASSERT(quantiles.size() == 1);
return quantiles[0];
}
if (quantiles.empty()) {
return Value::LIST(LogicalType::DOUBLE, std::move(quantiles));
}
auto child_type = quantiles[0].type();
return Value::LIST(child_type, std::move(quantiles));
}

template <class STATE, class STATE_FIELD = StateListType<StateInputType<0>>>
AggregateStateLayout QuantileStateLayout(AggregateLayoutInput &input) {
auto &function = input.function;
AggregateStateLayout layout;
if (function.GetReturnType().IsAggregateState()) {
// the function has been modified for state export (see ExportAggregateFunction::SetStateExport) -
// its return type IS the state type already
layout.type = function.GetReturnType();
} else {
layout.type = LogicalType::LIST(function.GetArguments()[0]);
}
layout.total_state_size = AlignValue<idx_t>(sizeof(STATE));
layout.field = BuildStateField<STATE_FIELD>();
AggregateStateField::PopulateListFunctions(layout.type, layout.field);
if (function.GetOriginalArguments().size() == 2) {
// the quantile parameter must be a constant at bind time (its argument is erased by BindQuantile) -
// record its value so that re-binding the exported state can supply it
// median and mad have no parameter argument (their binds create the quantile themselves) and skip this
auto &bind_data = input.bind_data->Cast<QuantileBindData>();
layout.constant_parameters.emplace(1, QuantileParameterValue(bind_data, function.GetOriginalArguments()[1]));
}
return layout;
}

} // namespace duckdb
13 changes: 11 additions & 2 deletions src/duckdb/extension/core_functions/lambda_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
#include "duckdb/planner/expression/bound_function_expression.hpp"
#include "duckdb/planner/expression/bound_cast_expression.hpp"
#include "duckdb/planner/expression/bound_lambda_expression.hpp"

#include "duckdb/common/enums/dialect_compatibility_mode.hpp"
#include "duckdb/main/settings.hpp"
namespace duckdb {

//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -38,6 +39,11 @@ struct LambdaExecuteInfo {
// initialize the data chunks
input_chunk.InitializeEmpty(input_types);
lambda_chunk.Initialize(Allocator::DefaultAllocator(), result_types);
// Spark Compatibility Mode: zero-based index for lambdas
if (Settings::Get<DialectCompatibilityModeSetting>(context) == DialectCompatibilityMode::SPARK) {
// Spark's lambda index parameter is 0-based; default SQL is 1-based
index_offset = 0;
}
};

//! The expression executor that executes the lambda expression
Expand All @@ -48,6 +54,8 @@ struct LambdaExecuteInfo {
DataChunk lambda_chunk;
//! True, if this lambda expression expects an index vector in the input chunk
bool has_index;
//! Added to child_idx to form the value the lambda sees in its index parameter (1 by default).
idx_t index_offset = 1;
};

//! A helper struct with information that is specific to the list_filter function
Expand Down Expand Up @@ -323,7 +331,8 @@ static void ExecuteLambda(DataChunk &args, ExpressionState &state, Vector &resul

// set the index vector
if (info.has_index) {
index_vector.SetValue(elem_cnt, Value::BIGINT(NumericCast<int64_t>(child_idx + 1)));
index_vector.SetValue(elem_cnt,
Value::BIGINT(NumericCast<int64_t>(child_idx + execute_info.index_offset)));
}

elem_cnt++;
Expand Down
1 change: 1 addition & 0 deletions src/duckdb/extension/parquet/include/column_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class ColumnWriter {

virtual void BeginWrite(ColumnWriterState &state) = 0;
virtual void Write(ColumnWriterState &state, Vector &vector, idx_t count) = 0;
virtual void PrepareWrite(ColumnWriterState &state) = 0;
virtual void FinalizeWrite(ColumnWriterState &state) = 0;

public:
Expand Down
9 changes: 6 additions & 3 deletions src/duckdb/extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "duckdb/common/exception.hpp"
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/atomic.hpp"
#include "duckdb/common/serializer/buffered_file_writer.hpp"
#include "duckdb/common/serializer/async_file_writer.hpp"
#include "duckdb/common/types/column/column_data_collection.hpp"
#include "duckdb/function/copy_function.hpp"
#include "parquet_statistics.hpp"
Expand Down Expand Up @@ -216,7 +216,7 @@ class ParquetWriter {
LogicalType GetSQLType(idx_t schema_idx) const {
return options.sql_types[schema_idx];
}
BufferedFileWriter &GetWriter() {
AsyncFileWriter &GetWriter() {
return *writer;
}
idx_t FileSize() const {
Expand Down Expand Up @@ -259,6 +259,9 @@ class ParquetWriter {

uint32_t Write(const duckdb_apache::thrift::TBase &object);
uint32_t WriteData(const const_data_ptr_t buffer, const uint32_t buffer_size);
unique_ptr<AsyncWriteBuffer> PrepareWrite(const duckdb_apache::thrift::TBase &object);
unique_ptr<AsyncWriteBuffer> PrepareWriteData(unique_ptr<AsyncWriteBuffer> buffer);
uint32_t WriteData(unique_ptr<AsyncWriteBuffer> buffer);

GeoParquetFileMetadata &GetGeoParquetData();

Expand Down Expand Up @@ -292,7 +295,7 @@ class ParquetWriter {
ParquetWriterOptions options;
shared_ptr<EncryptionUtil> encryption_util;

unique_ptr<BufferedFileWriter> writer;
unique_ptr<AsyncFileWriter> writer;
std::shared_ptr<duckdb_apache::thrift::protocol::TProtocol> protocol;
duckdb_parquet::FileMetaData file_meta_data;
std::mutex lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ListColumnWriter : public ColumnWriter {

void BeginWrite(ColumnWriterState &state) override;
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override;
void PrepareWrite(ColumnWriterState &state) override;
void FinalizeWrite(ColumnWriterState &state) override;
idx_t FinalizeSchema(vector<duckdb_parquet::SchemaElement> &schemas) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string>

#include "column_writer.hpp"
#include "duckdb/common/serializer/async_file_writer.hpp"
#include "writer/parquet_write_stats.hpp"
#include "duckdb/common/serializer/memory_stream.hpp"
#include "parquet_statistics.hpp"
Expand All @@ -29,6 +30,7 @@ class ParquetWriter;
class Vector;
class WriteStream;
struct ParquetColumnSchema;
struct PrimitiveDictionaryTargetData;

struct PageInformation {
idx_t offset = 0;
Expand All @@ -48,6 +50,8 @@ struct PageWriteInformation {
size_t compressed_size;
data_ptr_t compressed_data;
AllocatedData compressed_buf;
unique_ptr<AsyncWriteBuffer> prepared_header;
unique_ptr<AsyncWriteBuffer> prepared_payload;
};

class PrimitiveColumnWriterState : public ColumnWriterState {
Expand Down Expand Up @@ -88,6 +92,7 @@ class PrimitiveColumnWriter : public ColumnWriter {
bool vector_can_span_multiple_pages) override;
void BeginWrite(ColumnWriterState &state) override;
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override;
void PrepareWrite(ColumnWriterState &state) override;
void FinalizeWrite(ColumnWriterState &state) override;
idx_t FinalizeSchema(vector<duckdb_parquet::SchemaElement> &schemas) override;

Expand Down Expand Up @@ -121,6 +126,7 @@ class PrimitiveColumnWriter : public ColumnWriter {
//! The number of elements in the dictionary
virtual idx_t DictionarySize(PrimitiveColumnWriterState &state_p);
void WriteDictionary(PrimitiveColumnWriterState &state, unique_ptr<MemoryStream> temp_writer, idx_t row_count);
void WriteDictionary(PrimitiveColumnWriterState &state, PrimitiveDictionaryTargetData target_data, idx_t row_count);
virtual void FlushDictionary(PrimitiveColumnWriterState &state, ColumnWriterStatistics *stats);

void SetParquetStatistics(PrimitiveColumnWriterState &state, duckdb_parquet::ColumnChunk &column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class StructColumnWriter : public ColumnWriter {

void BeginWrite(ColumnWriterState &state) override;
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override;
void PrepareWrite(ColumnWriterState &state) override;
void FinalizeWrite(ColumnWriterState &state) override;
idx_t FinalizeSchema(vector<duckdb_parquet::SchemaElement> &schemas) override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
});

// flush the dictionary page and add it to the to-be-written pages
WriteDictionary(state, state.dictionary.GetTargetMemoryStream(), state.dictionary.GetSize());
auto dictionary_size = state.dictionary.GetSize();
WriteDictionary(state, state.dictionary.TakeTargetData(), dictionary_size);
// bloom filter will be queued for writing in ParquetWriter::BufferBloomFilter one level up
}

Expand Down
Loading
Loading