diff --git a/src/duckdb/extension/core_functions/scalar/date/date_part.cpp b/src/duckdb/extension/core_functions/scalar/date/date_part.cpp index 3633f1f53..478e59738 100644 --- a/src/duckdb/extension/core_functions/scalar/date/date_part.cpp +++ b/src/duckdb/extension/core_functions/scalar/date/date_part.cpp @@ -88,10 +88,24 @@ DatePartSpecifier GetDateTypePartSpecifier(const string &specifier, LogicalType throw NotImplementedException("\"%s\" units \"%s\" not recognized", EnumUtil::ToString(type.id()), specifier); } -template +template unique_ptr PropagateSimpleDatePartStatistics(vector &child_stats) { - // we can always propagate simple date part statistics - // since the min and max can never exceed these bounds + // we can only propagate simple date part statistics if the child has stats + auto &nstats = child_stats[0]; + if (!NumericStats::HasMinMax(nstats)) { + return nullptr; + } + auto min = NumericStats::GetMin(nstats); + auto max = NumericStats::GetMax(nstats); + if (min > max) { + return nullptr; + } + // Infinities produce a NULL date part even though the input is not NULL, + // so we cannot propagate the validity (and thus the stats) in that case + if (!Value::IsFinite(min) || !Value::IsFinite(max)) { + return nullptr; + } + // the min and max can never exceed these bounds auto result = NumericStats::CreateEmpty(LogicalType::BIGINT); result.CopyValidity(child_stats[0]); NumericStats::SetMin(result, Value::BIGINT(MIN)); @@ -182,7 +196,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { // min/max of month operator is [1, 12] - return PropagateSimpleDatePartStatistics<1, 12>(input.child_stats); + return PropagateSimpleDatePartStatistics<1, 12, T>(input.child_stats); } }; @@ -195,7 +209,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { // min/max of day operator is [1, 31] - return PropagateSimpleDatePartStatistics<1, 31>(input.child_stats); + return PropagateSimpleDatePartStatistics<1, 31, T>(input.child_stats); } }; @@ -281,7 +295,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { // min/max of quarter operator is [1, 4] - return PropagateSimpleDatePartStatistics<1, 4>(input.child_stats); + return PropagateSimpleDatePartStatistics<1, 4, T>(input.child_stats); } }; @@ -300,7 +314,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 6>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 6, T>(input.child_stats); } }; @@ -313,7 +327,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<1, 7>(input.child_stats); + return PropagateSimpleDatePartStatistics<1, 7, T>(input.child_stats); } }; @@ -325,7 +339,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<1, 366>(input.child_stats); + return PropagateSimpleDatePartStatistics<1, 366, T>(input.child_stats); } }; @@ -337,7 +351,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<1, 54>(input.child_stats); + return PropagateSimpleDatePartStatistics<1, 54, T>(input.child_stats); } }; @@ -426,7 +440,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 60000000000>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 60000000000, T>(input.child_stats); } }; @@ -438,7 +452,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 60000000>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 60000000, T>(input.child_stats); } }; @@ -450,7 +464,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 60000>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 60000, T>(input.child_stats); } }; @@ -462,7 +476,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 60>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 60, T>(input.child_stats); } }; @@ -474,7 +488,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 60>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 60, T>(input.child_stats); } }; @@ -486,7 +500,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 24>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 24, T>(input.child_stats); } }; @@ -515,7 +529,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 1>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 1, T>(input.child_stats); } }; @@ -547,7 +561,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 0>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 0, T>(input.child_stats); } }; @@ -560,7 +574,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 0>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 0, T>(input.child_stats); } }; @@ -573,7 +587,7 @@ struct DatePart { template static unique_ptr PropagateStatistics(ClientContext &context, FunctionStatisticsInput &input) { - return PropagateSimpleDatePartStatistics<0, 0>(input.child_stats); + return PropagateSimpleDatePartStatistics<0, 0, T>(input.child_stats); } }; diff --git a/src/duckdb/extension/core_functions/scalar/debug/sleep.cpp b/src/duckdb/extension/core_functions/scalar/debug/sleep.cpp index b0cdc4646..1f6d35302 100644 --- a/src/duckdb/extension/core_functions/scalar/debug/sleep.cpp +++ b/src/duckdb/extension/core_functions/scalar/debug/sleep.cpp @@ -17,8 +17,9 @@ struct NullResultType { static void SleepFunction(DataChunk &input, ExpressionState &state, Vector &result) { input.Flatten(); + auto &context = state.GetContext(); GenericExecutor::ExecuteUnary, NullResultType>(input.data[0], result, input.size(), - [](PrimitiveType input) { + [&context](PrimitiveType input) { // Sleep for the specified number of // milliseconds (clamp negative values to // 0) @@ -26,7 +27,7 @@ static void SleepFunction(DataChunk &input, ExpressionState &state, Vector &resu if (sleep_ms < 0) { sleep_ms = 0; } - ThreadUtil::SleepMs(sleep_ms); + ThreadUtil::SleepMs(sleep_ms, context); return NullResultType(); }); } diff --git a/src/duckdb/extension/parquet/include/parquet_writer.hpp b/src/duckdb/extension/parquet/include/parquet_writer.hpp index f07d9264a..c8ea43f57 100644 --- a/src/duckdb/extension/parquet/include/parquet_writer.hpp +++ b/src/duckdb/extension/parquet/include/parquet_writer.hpp @@ -63,10 +63,13 @@ class ParquetWriteTransformData { public: ColumnDataCollection &ApplyTransform(ColumnDataCollection &input); + bool MatchesTypes(const vector &other_types) const; private: //! The buffer to store the transformed chunks of a rowgroup ColumnDataCollection buffer; + //! The types used to bind the expressions and initialize the buffer + vector types; //! The expression(s) to apply to the input chunk vector> expressions; //! The expression executor used to transform the input chunk diff --git a/src/duckdb/extension/parquet/parquet_writer.cpp b/src/duckdb/extension/parquet/parquet_writer.cpp index f7928647a..82538ae09 100644 --- a/src/duckdb/extension/parquet/parquet_writer.cpp +++ b/src/duckdb/extension/parquet/parquet_writer.cpp @@ -355,9 +355,13 @@ class ParquetStatsAccumulator { ParquetWriteTransformData::ParquetWriteTransformData(ClientContext &context, vector types, vector> expressions_p) - : buffer(context, types, ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR), expressions(std::move(expressions_p)), - executor(context, expressions) { - chunk.Initialize(buffer.GetAllocator(), types); + : buffer(context, types, ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR), types(std::move(types)), + expressions(std::move(expressions_p)), executor(context, expressions) { + chunk.Initialize(buffer.GetAllocator(), this->types); +} + +bool ParquetWriteTransformData::MatchesTypes(const vector &other_types) const { + return types == other_types; } //! TODO: this doesnt work.. the ParquetWriteTransformData is shared with all threads, the method is stateful, but has @@ -488,22 +492,28 @@ void ParquetWriter::AnalyzeSchema(ColumnDataCollection &buffer, vector &transform_data) { - if (transform_data) { + vector transformed_types; + for (idx_t col_idx = 0; col_idx < column_writers.size(); col_idx++) { + auto &column_writer = *column_writers[col_idx]; + auto &original_type = sql_types[col_idx]; + if (!column_writer.HasTransform()) { + transformed_types.push_back(original_type); + continue; + } + transformed_types.push_back(column_writer.TransformedType()); + } + if (transform_data && transform_data->MatchesTypes(transformed_types)) { return; } - vector transformed_types; vector> transform_expressions; for (idx_t col_idx = 0; col_idx < column_writers.size(); col_idx++) { auto &column_writer = *column_writers[col_idx]; - auto &original_type = sql_types[col_idx]; - auto expr = make_uniq(original_type, col_idx); + auto expr = make_uniq(sql_types[col_idx], col_idx); if (!column_writer.HasTransform()) { - transformed_types.push_back(original_type); transform_expressions.push_back(std::move(expr)); continue; } - transformed_types.push_back(column_writer.TransformedType()); transform_expressions.push_back(column_writer.TransformExpression(std::move(expr))); } transform_data = make_uniq(context, transformed_types, std::move(transform_expressions)); diff --git a/src/duckdb/src/common/allocator/allocator.cpp b/src/duckdb/src/common/allocator/allocator.cpp index 25eae3c17..4044cdb82 100644 --- a/src/duckdb/src/common/allocator/allocator.cpp +++ b/src/duckdb/src/common/allocator/allocator.cpp @@ -10,6 +10,10 @@ #include +#ifdef __GLIBC__ +#include +#endif + #ifdef DUCKDB_DEBUG_ALLOCATION #include "duckdb/common/mutex.hpp" #include "duckdb/common/pair.hpp" @@ -175,6 +179,28 @@ Allocator &Allocator::DefaultAllocator() { return *DefaultAllocatorReference(); } +void Allocator::MallocTrim(idx_t pad) { +#ifdef __GLIBC__ + static constexpr int64_t TRIM_INTERVAL_MS = 100; + static atomic LAST_TRIM_TIMESTAMP_MS {0}; + + int64_t last_trim_timestamp_ms = LAST_TRIM_TIMESTAMP_MS.load(); + auto current_ts = Timestamp::GetCurrentTimestamp(); + auto current_timestamp_ms = Cast::Operation(current_ts).value; + + if (current_timestamp_ms - last_trim_timestamp_ms < TRIM_INTERVAL_MS) { + return; // We trimmed less than TRIM_INTERVAL_MS ago + } + if (!LAST_TRIM_TIMESTAMP_MS.compare_exchange_strong(last_trim_timestamp_ms, current_timestamp_ms, + std::memory_order_acquire, std::memory_order_relaxed)) { + return; // Another thread has updated LAST_TRIM_TIMESTAMP_MS since we loaded it + } + + // We successfully updated LAST_TRIM_TIMESTAMP_MS, we can trim + malloc_trim(pad); +#endif +} + //===--------------------------------------------------------------------===// // Debug Info (extended) //===--------------------------------------------------------------------===// diff --git a/src/duckdb/src/common/allocator/allocator_jemalloc.cpp b/src/duckdb/src/common/allocator/allocator_jemalloc.cpp index 5a5a5b50f..ee3b091aa 100644 --- a/src/duckdb/src/common/allocator/allocator_jemalloc.cpp +++ b/src/duckdb/src/common/allocator/allocator_jemalloc.cpp @@ -80,6 +80,11 @@ bool Allocator::SupportsFlush() { } void Allocator::ThreadFlush(bool allocator_background_threads, idx_t threshold, idx_t thread_count) { + // jemalloc only manages allocation done through the Allocator interface. + // Any allocations done directly through "malloc" or "operator new" still + // go to the system allocator. So we also trim the system heap here. + MallocTrim(thread_count * threshold); + if (!allocator_background_threads) { // We flush after exceeding the threshold if (GetJemallocCTL("thread.peak.read") <= threshold) { @@ -116,6 +121,9 @@ void Allocator::FlushAll() { // Reset the peak after resetting SetJemallocCTL("thread.peak.reset"); + + // Also return the system heap (see ThreadFlush) to the OS + MallocTrim(0); } void Allocator::SetBackgroundThreads(bool enable) { diff --git a/src/duckdb/src/common/allocator/allocator_standard.cpp b/src/duckdb/src/common/allocator/allocator_standard.cpp index 45e03d46b..da6a959c8 100644 --- a/src/duckdb/src/common/allocator/allocator_standard.cpp +++ b/src/duckdb/src/common/allocator/allocator_standard.cpp @@ -3,15 +3,8 @@ #ifndef DUCKDB_ENABLE_JEMALLOC #include "duckdb/common/assert.hpp" -#include "duckdb/common/atomic.hpp" #include "duckdb/common/exception.hpp" #include "duckdb/common/helper.hpp" -#include "duckdb/common/operator/cast_operators.hpp" -#include "duckdb/common/types/timestamp.hpp" - -#ifdef __GLIBC__ -#include -#endif #ifdef DUCKDB_DEBUG_ALLOCATION #include "duckdb/common/mutex.hpp" @@ -52,28 +45,6 @@ bool Allocator::SupportsFlush() { #endif } -static void MallocTrim(idx_t pad) { -#ifdef __GLIBC__ - static constexpr int64_t TRIM_INTERVAL_MS = 100; - static atomic LAST_TRIM_TIMESTAMP_MS {0}; - - int64_t last_trim_timestamp_ms = LAST_TRIM_TIMESTAMP_MS.load(); - auto current_ts = Timestamp::GetCurrentTimestamp(); - auto current_timestamp_ms = Cast::Operation(current_ts).value; - - if (current_timestamp_ms - last_trim_timestamp_ms < TRIM_INTERVAL_MS) { - return; // We trimmed less than TRIM_INTERVAL_MS ago - } - if (!LAST_TRIM_TIMESTAMP_MS.compare_exchange_strong(last_trim_timestamp_ms, current_timestamp_ms, - std::memory_order_acquire, std::memory_order_relaxed)) { - return; // Another thread has updated LAST_TRIM_TIMESTAMP_MS since we loaded it - } - - // We successfully updated LAST_TRIM_TIMESTAMP_MS, we can trim - malloc_trim(pad); -#endif -} - void Allocator::ThreadFlush(bool allocator_background_threads, idx_t threshold, idx_t thread_count) { MallocTrim(thread_count * threshold); } diff --git a/src/duckdb/src/common/thread_util.cpp b/src/duckdb/src/common/thread_util.cpp index 96a576e21..bc96a2022 100644 --- a/src/duckdb/src/common/thread_util.cpp +++ b/src/duckdb/src/common/thread_util.cpp @@ -1,16 +1,34 @@ #include "duckdb/common/thread.hpp" #include "duckdb/common/chrono.hpp" #include "duckdb/original/std/sstream.hpp" +#include "duckdb/common/helper.hpp" +#include "duckdb/common/types/timestamp.hpp" +#include "duckdb/common/types/interval.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { #ifndef DUCKDB_NO_THREADS -void ThreadUtil::SleepMs(idx_t sleep_ms) { - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); +void ThreadUtil::SleepMs(idx_t sleep_ms, optional_ptr context) { + auto target_time = Timestamp::GetCurrentTimestamp(); + target_time.value += static_cast(sleep_ms) * Interval::MICROS_PER_MSEC; + static constexpr idx_t DEFAULT_SLEEP_INTERVAL_MS = 100; + + while (true) { + auto current_time = Timestamp::GetCurrentTimestamp(); + if (context && context->IsInterrupted()) { + throw InterruptException(); + } + if (current_time >= target_time) { + break; + } + auto remaining_ms = static_cast(target_time.value - current_time.value) / Interval::MICROS_PER_MSEC; + std::this_thread::sleep_for(milliseconds(MinValue(remaining_ms, DEFAULT_SLEEP_INTERVAL_MS))); + } } void ThreadUtil::SleepMicroSeconds(idx_t micros) { - std::this_thread::sleep_for(std::chrono::microseconds(micros)); + std::this_thread::sleep_for(microseconds(micros)); } thread_id ThreadUtil::GetThreadId() { @@ -25,7 +43,7 @@ string ThreadUtil::GetThreadIdString() { #else -void ThreadUtil::SleepMs(idx_t sleep_ms) { +void ThreadUtil::SleepMs(idx_t sleep_ms, optional_ptr) { throw InvalidInputException("ThreadUtil::SleepMs requires DuckDB to be compiled with thread support"); } diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index da8449c69..d808b8e63 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "4-dev320" +#define DUCKDB_PATCH_VERSION "4" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 5 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.5.4-dev320" +#define DUCKDB_VERSION "v1.5.4" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "f086d79427" +#define DUCKDB_SOURCE_ID "08e34c447b" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/common/allocator.hpp b/src/duckdb/src/include/duckdb/common/allocator.hpp index a9560de9f..c6f8f8741 100644 --- a/src/duckdb/src/include/duckdb/common/allocator.hpp +++ b/src/duckdb/src/include/duckdb/common/allocator.hpp @@ -133,6 +133,10 @@ class Allocator { static void SetBackgroundThreads(bool enable); private: + //! Returns free memory in the system heap (glibc) to the OS via malloc_trim, rate-limited to once + //! per 100ms. No-op on non-glibc platforms. Shared by the flush paths of both allocator backends. + static void MallocTrim(idx_t pad); + allocate_function_ptr_t allocate_function; free_function_ptr_t free_function; reallocate_function_ptr_t reallocate_function; diff --git a/src/duckdb/src/include/duckdb/common/thread.hpp b/src/duckdb/src/include/duckdb/common/thread.hpp index 277e34b39..ccf773c5c 100644 --- a/src/duckdb/src/include/duckdb/common/thread.hpp +++ b/src/duckdb/src/include/duckdb/common/thread.hpp @@ -23,10 +23,14 @@ using thread_id = std::thread::id; using thread_id = uint64_t; #endif +#include "duckdb/common/optional_ptr.hpp" + namespace duckdb { +class ClientContext; + struct ThreadUtil { - static void SleepMs(idx_t ms); + static void SleepMs(idx_t ms, optional_ptr context = nullptr); static void SleepMicroSeconds(idx_t micros); static thread_id GetThreadId(); static string GetThreadIdString(); diff --git a/src/duckdb/src/storage/table/geo_column_data.cpp b/src/duckdb/src/storage/table/geo_column_data.cpp index f13211a47..716300af5 100644 --- a/src/duckdb/src/storage/table/geo_column_data.cpp +++ b/src/duckdb/src/storage/table/geo_column_data.cpp @@ -353,12 +353,21 @@ unique_ptr GeoColumnData::Checkpoint(const RowGroup &row_ checkpoint_state->inner_column_state = checkpoint_state->inner_column->Checkpoint(row_group, info, old_column_stats); - if (base_column->GetType().id() == LogicalTypeId::GEOMETRY) { - // Get the stats from the base column. + // Only the specialized (shredded) layouts need to be reinterpreted via GetSpecializedType. + // Both WKB and the legacy SPATIAL format store the full, unshredded geometry, so their stats come + // directly from the column rather than from a specialized layout. + + const auto storage_type = checkpoint_state->storage_type; + if (storage_type == GeometryStorageType::WKB) { + // WKB: the base column carries the geometry stats directly. checkpoint_state->global_stats = checkpoint_state->inner_column_state->GetStatistics(); + } else if (storage_type == GeometryStorageType::SPATIAL) { + // Legacy SPATIAL: the base column is stored as a BLOB and has no geometry stats of its own. + // The column is unchanged here, so the incoming geometry stats remain valid. + checkpoint_state->global_stats = old_stats.Copy().ToUnique(); } else { - // Otherwise interpret stats from shredded column - const auto types = Geometry::GetSpecializedType(checkpoint_state->storage_type); + // Shredded storage, interpret stats from shredded column + const auto types = Geometry::GetSpecializedType(storage_type); const auto gtype = types.first; const auto vtype = types.second;