Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions src/duckdb/extension/core_functions/scalar/date/date_part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,24 @@ DatePartSpecifier GetDateTypePartSpecifier(const string &specifier, LogicalType
throw NotImplementedException("\"%s\" units \"%s\" not recognized", EnumUtil::ToString(type.id()), specifier);
}

template <int64_t MIN, int64_t MAX>
template <int64_t MIN, int64_t MAX, class T>
unique_ptr<BaseStatistics> PropagateSimpleDatePartStatistics(vector<BaseStatistics> &child_stats) {
// we can always propagate simple date part statistics
// since the min and max can never exceed these bounds
// we can only propagate simple date part statistics if the child has stats
auto &nstats = child_stats[0];
if (!NumericStats::HasMinMax(nstats)) {
return nullptr;
}
auto min = NumericStats::GetMin<T>(nstats);
auto max = NumericStats::GetMax<T>(nstats);
if (min > max) {
return nullptr;
}
// Infinities produce a NULL date part even though the input is not NULL,
// so we cannot propagate the validity (and thus the stats) in that case
if (!Value::IsFinite(min) || !Value::IsFinite(max)) {
return nullptr;
}
// the min and max can never exceed these bounds
auto result = NumericStats::CreateEmpty(LogicalType::BIGINT);
result.CopyValidity(child_stats[0]);
NumericStats::SetMin(result, Value::BIGINT(MIN));
Expand Down Expand Up @@ -182,7 +196,7 @@ struct DatePart {
template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
// min/max of month operator is [1, 12]
return PropagateSimpleDatePartStatistics<1, 12>(input.child_stats);
return PropagateSimpleDatePartStatistics<1, 12, T>(input.child_stats);
}
};

Expand All @@ -195,7 +209,7 @@ struct DatePart {
template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
// min/max of day operator is [1, 31]
return PropagateSimpleDatePartStatistics<1, 31>(input.child_stats);
return PropagateSimpleDatePartStatistics<1, 31, T>(input.child_stats);
}
};

Expand Down Expand Up @@ -281,7 +295,7 @@ struct DatePart {
template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
// min/max of quarter operator is [1, 4]
return PropagateSimpleDatePartStatistics<1, 4>(input.child_stats);
return PropagateSimpleDatePartStatistics<1, 4, T>(input.child_stats);
}
};

Expand All @@ -300,7 +314,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 6>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 6, T>(input.child_stats);
}
};

Expand All @@ -313,7 +327,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<1, 7>(input.child_stats);
return PropagateSimpleDatePartStatistics<1, 7, T>(input.child_stats);
}
};

Expand All @@ -325,7 +339,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<1, 366>(input.child_stats);
return PropagateSimpleDatePartStatistics<1, 366, T>(input.child_stats);
}
};

Expand All @@ -337,7 +351,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<1, 54>(input.child_stats);
return PropagateSimpleDatePartStatistics<1, 54, T>(input.child_stats);
}
};

Expand Down Expand Up @@ -426,7 +440,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 60000000000>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 60000000000, T>(input.child_stats);
}
};

Expand All @@ -438,7 +452,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 60000000>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 60000000, T>(input.child_stats);
}
};

Expand All @@ -450,7 +464,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 60000>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 60000, T>(input.child_stats);
}
};

Expand All @@ -462,7 +476,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 60>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 60, T>(input.child_stats);
}
};

Expand All @@ -474,7 +488,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 60>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 60, T>(input.child_stats);
}
};

Expand All @@ -486,7 +500,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 24>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 24, T>(input.child_stats);
}
};

Expand Down Expand Up @@ -515,7 +529,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 1>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 1, T>(input.child_stats);
}
};

Expand Down Expand Up @@ -547,7 +561,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 0>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 0, T>(input.child_stats);
}
};

Expand All @@ -560,7 +574,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 0>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 0, T>(input.child_stats);
}
};

Expand All @@ -573,7 +587,7 @@ struct DatePart {

template <class T>
static unique_ptr<BaseStatistics> PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) {
return PropagateSimpleDatePartStatistics<0, 0>(input.child_stats);
return PropagateSimpleDatePartStatistics<0, 0, T>(input.child_stats);
}
};

Expand Down
5 changes: 3 additions & 2 deletions src/duckdb/extension/core_functions/scalar/debug/sleep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ struct NullResultType {

static void SleepFunction(DataChunk &input, ExpressionState &state, Vector &result) {
input.Flatten();
auto &context = state.GetContext();
GenericExecutor::ExecuteUnary<PrimitiveType<int64_t>, NullResultType>(input.data[0], result, input.size(),
[](PrimitiveType<int64_t> input) {
[&context](PrimitiveType<int64_t> input) {
// Sleep for the specified number of
// milliseconds (clamp negative values to
// 0)
int64_t sleep_ms = input.val;
if (sleep_ms < 0) {
sleep_ms = 0;
}
ThreadUtil::SleepMs(sleep_ms);
ThreadUtil::SleepMs(sleep_ms, context);
return NullResultType();
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ class ParquetWriteTransformData {

public:
ColumnDataCollection &ApplyTransform(ColumnDataCollection &input);
bool MatchesTypes(const vector<LogicalType> &other_types) const;

private:
//! The buffer to store the transformed chunks of a rowgroup
ColumnDataCollection buffer;
//! The types used to bind the expressions and initialize the buffer
vector<LogicalType> types;
//! The expression(s) to apply to the input chunk
vector<unique_ptr<Expression>> expressions;
//! The expression executor used to transform the input chunk
Expand Down
28 changes: 19 additions & 9 deletions src/duckdb/extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,13 @@ class ParquetStatsAccumulator {

ParquetWriteTransformData::ParquetWriteTransformData(ClientContext &context, vector<LogicalType> types,
vector<unique_ptr<Expression>> expressions_p)
: buffer(context, types, ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR), expressions(std::move(expressions_p)),
executor(context, expressions) {
chunk.Initialize(buffer.GetAllocator(), types);
: buffer(context, types, ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR), types(std::move(types)),
expressions(std::move(expressions_p)), executor(context, expressions) {
chunk.Initialize(buffer.GetAllocator(), this->types);
}

bool ParquetWriteTransformData::MatchesTypes(const vector<LogicalType> &other_types) const {
return types == other_types;
}

//! TODO: this doesnt work.. the ParquetWriteTransformData is shared with all threads, the method is stateful, but has
Expand Down Expand Up @@ -488,22 +492,28 @@ void ParquetWriter::AnalyzeSchema(ColumnDataCollection &buffer, vector<unique_pt
}

void ParquetWriter::InitializePreprocessing(unique_ptr<ParquetWriteTransformData> &transform_data) {
if (transform_data) {
vector<LogicalType> transformed_types;
for (idx_t col_idx = 0; col_idx < column_writers.size(); col_idx++) {
auto &column_writer = *column_writers[col_idx];
auto &original_type = sql_types[col_idx];
if (!column_writer.HasTransform()) {
transformed_types.push_back(original_type);
continue;
}
transformed_types.push_back(column_writer.TransformedType());
}
if (transform_data && transform_data->MatchesTypes(transformed_types)) {
return;
}

vector<LogicalType> transformed_types;
vector<unique_ptr<Expression>> transform_expressions;
for (idx_t col_idx = 0; col_idx < column_writers.size(); col_idx++) {
auto &column_writer = *column_writers[col_idx];
auto &original_type = sql_types[col_idx];
auto expr = make_uniq<BoundReferenceExpression>(original_type, col_idx);
auto expr = make_uniq<BoundReferenceExpression>(sql_types[col_idx], col_idx);
if (!column_writer.HasTransform()) {
transformed_types.push_back(original_type);
transform_expressions.push_back(std::move(expr));
continue;
}
transformed_types.push_back(column_writer.TransformedType());
transform_expressions.push_back(column_writer.TransformExpression(std::move(expr)));
}
transform_data = make_uniq<ParquetWriteTransformData>(context, transformed_types, std::move(transform_expressions));
Expand Down
26 changes: 26 additions & 0 deletions src/duckdb/src/common/allocator/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

#include <cstdint>

#ifdef __GLIBC__
#include <malloc.h>
#endif

#ifdef DUCKDB_DEBUG_ALLOCATION
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/pair.hpp"
Expand Down Expand Up @@ -175,6 +179,28 @@ Allocator &Allocator::DefaultAllocator() {
return *DefaultAllocatorReference();
}

void Allocator::MallocTrim(idx_t pad) {
#ifdef __GLIBC__
static constexpr int64_t TRIM_INTERVAL_MS = 100;
static atomic<int64_t> LAST_TRIM_TIMESTAMP_MS {0};

int64_t last_trim_timestamp_ms = LAST_TRIM_TIMESTAMP_MS.load();
auto current_ts = Timestamp::GetCurrentTimestamp();
auto current_timestamp_ms = Cast::Operation<timestamp_t, timestamp_ms_t>(current_ts).value;

if (current_timestamp_ms - last_trim_timestamp_ms < TRIM_INTERVAL_MS) {
return; // We trimmed less than TRIM_INTERVAL_MS ago
}
if (!LAST_TRIM_TIMESTAMP_MS.compare_exchange_strong(last_trim_timestamp_ms, current_timestamp_ms,
std::memory_order_acquire, std::memory_order_relaxed)) {
return; // Another thread has updated LAST_TRIM_TIMESTAMP_MS since we loaded it
}

// We successfully updated LAST_TRIM_TIMESTAMP_MS, we can trim
malloc_trim(pad);
#endif
}

//===--------------------------------------------------------------------===//
// Debug Info (extended)
//===--------------------------------------------------------------------===//
Expand Down
8 changes: 8 additions & 0 deletions src/duckdb/src/common/allocator/allocator_jemalloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ bool Allocator::SupportsFlush() {
}

void Allocator::ThreadFlush(bool allocator_background_threads, idx_t threshold, idx_t thread_count) {
// jemalloc only manages allocation done through the Allocator interface.
// Any allocations done directly through "malloc" or "operator new" still
// go to the system allocator. So we also trim the system heap here.
MallocTrim(thread_count * threshold);

if (!allocator_background_threads) {
// We flush after exceeding the threshold
if (GetJemallocCTL<uint64_t>("thread.peak.read") <= threshold) {
Expand Down Expand Up @@ -116,6 +121,9 @@ void Allocator::FlushAll() {

// Reset the peak after resetting
SetJemallocCTL("thread.peak.reset");

// Also return the system heap (see ThreadFlush) to the OS
MallocTrim(0);
}

void Allocator::SetBackgroundThreads(bool enable) {
Expand Down
29 changes: 0 additions & 29 deletions src/duckdb/src/common/allocator/allocator_standard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,8 @@
#ifndef DUCKDB_ENABLE_JEMALLOC

#include "duckdb/common/assert.hpp"
#include "duckdb/common/atomic.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/common/operator/cast_operators.hpp"
#include "duckdb/common/types/timestamp.hpp"

#ifdef __GLIBC__
#include <malloc.h>
#endif

#ifdef DUCKDB_DEBUG_ALLOCATION
#include "duckdb/common/mutex.hpp"
Expand Down Expand Up @@ -52,28 +45,6 @@ bool Allocator::SupportsFlush() {
#endif
}

static void MallocTrim(idx_t pad) {
#ifdef __GLIBC__
static constexpr int64_t TRIM_INTERVAL_MS = 100;
static atomic<int64_t> LAST_TRIM_TIMESTAMP_MS {0};

int64_t last_trim_timestamp_ms = LAST_TRIM_TIMESTAMP_MS.load();
auto current_ts = Timestamp::GetCurrentTimestamp();
auto current_timestamp_ms = Cast::Operation<timestamp_t, timestamp_ms_t>(current_ts).value;

if (current_timestamp_ms - last_trim_timestamp_ms < TRIM_INTERVAL_MS) {
return; // We trimmed less than TRIM_INTERVAL_MS ago
}
if (!LAST_TRIM_TIMESTAMP_MS.compare_exchange_strong(last_trim_timestamp_ms, current_timestamp_ms,
std::memory_order_acquire, std::memory_order_relaxed)) {
return; // Another thread has updated LAST_TRIM_TIMESTAMP_MS since we loaded it
}

// We successfully updated LAST_TRIM_TIMESTAMP_MS, we can trim
malloc_trim(pad);
#endif
}

void Allocator::ThreadFlush(bool allocator_background_threads, idx_t threshold, idx_t thread_count) {
MallocTrim(thread_count * threshold);
}
Expand Down
Loading
Loading