diff --git a/src/duckdb/extension/core_functions/scalar/date/time_bucket.cpp b/src/duckdb/extension/core_functions/scalar/date/time_bucket.cpp index d8f96706e..4c8c5b55b 100644 --- a/src/duckdb/extension/core_functions/scalar/date/time_bucket.cpp +++ b/src/duckdb/extension/core_functions/scalar/date/time_bucket.cpp @@ -1,5 +1,4 @@ #include "duckdb/common/exception.hpp" -#include "duckdb/common/limits.hpp" #include "duckdb/common/operator/cast_operators.hpp" #include "duckdb/common/operator/subtract.hpp" #include "duckdb/common/types/date.hpp" @@ -8,13 +7,52 @@ #include "duckdb/common/types/value.hpp" #include "duckdb/common/vector_operations/binary_executor.hpp" #include "duckdb/common/vector_operations/ternary_executor.hpp" -#include "duckdb/common/vector_operations/vector_operations.hpp" #include "core_functions/scalar/date_functions.hpp" namespace duckdb { namespace { +template +timestamp_t ToBucketTimestamp(const T &t) { + return Cast::template Operation(t); +} + +template <> +timestamp_t ToBucketTimestamp(const dtime_t &t) { + return Timestamp::FromDatetime(date_t::epoch(), t); +} + +template +date_t ToBucketDate(const T &t) { + return Cast::template Operation(t); +} + +template <> +date_t ToBucketDate(const dtime_t &t) { + return date_t::epoch(); +} + +template +T FromBucketTimestamp(const timestamp_t &t) { + return Cast::template Operation(t); +} + +template <> +dtime_t FromBucketTimestamp(const timestamp_t &t) { + return Timestamp::GetTime(t); +} + +template +T FromBucketDate(const date_t &t) { + return Cast::template Operation(t); +} + +template <> +dtime_t FromBucketDate(const date_t &t) { + return dtime_t(0); +} + struct TimeBucket { // Use 2000-01-03 00:00:00 (Monday) as origin when bucket_width is days, hours, ... for TimescaleDB compatibility // There are 10959 days between 1970-01-01 and 2000-01-03 @@ -100,8 +138,8 @@ struct TimeBucket { return Cast::template Operation(ts); } int64_t bucket_width_micros = Interval::GetMicro(bucket_width); - int64_t ts_micros = Timestamp::GetEpochMicroSeconds(Cast::template Operation(ts)); - return Cast::template Operation( + int64_t ts_micros = Timestamp::GetEpochMicroSeconds(ToBucketTimestamp(ts)); + return FromBucketTimestamp( WidthConvertibleToMicrosCommon(bucket_width_micros, ts_micros, DEFAULT_ORIGIN_MICROS)); } }; @@ -112,8 +150,8 @@ struct TimeBucket { if (!Value::IsFinite(ts)) { return Cast::template Operation(ts); } - int32_t ts_months = EpochMonths(ts); - return Cast::template Operation( + int32_t ts_months = EpochMonths(ToBucketDate(ts)); + return FromBucketDate( WidthConvertibleToMonthsCommon(bucket_width.months, ts_months, DEFAULT_ORIGIN_MONTHS)); } }; @@ -140,9 +178,9 @@ struct TimeBucket { return Cast::template Operation(ts); } int64_t bucket_width_micros = Interval::GetMicro(bucket_width); - int64_t ts_micros = Timestamp::GetEpochMicroSeconds( - Interval::Add(Cast::template Operation(ts), Interval::Invert(offset))); - return Cast::template Operation(Interval::Add( + int64_t ts_micros = + Timestamp::GetEpochMicroSeconds(Interval::Add(ToBucketTimestamp(ts), Interval::Invert(offset))); + return FromBucketTimestamp(Interval::Add( WidthConvertibleToMicrosCommon(bucket_width_micros, ts_micros, DEFAULT_ORIGIN_MICROS), offset)); } }; @@ -153,10 +191,10 @@ struct TimeBucket { if (!Value::IsFinite(ts)) { return Cast::template Operation(ts); } - int32_t ts_months = EpochMonths(Interval::Add(ts, Interval::Invert(offset))); - return Interval::Add(Cast::template Operation(WidthConvertibleToMonthsCommon( - bucket_width.months, ts_months, DEFAULT_ORIGIN_MONTHS)), - offset); + int32_t ts_months = EpochMonths(Interval::Add(ToBucketTimestamp(ts), Interval::Invert(offset))); + return FromBucketTimestamp(Interval::Add(ToBucketTimestamp(WidthConvertibleToMonthsCommon( + bucket_width.months, ts_months, DEFAULT_ORIGIN_MONTHS)), + offset)); } }; @@ -184,9 +222,9 @@ struct TimeBucket { return Cast::template Operation(ts); } int64_t bucket_width_micros = Interval::GetMicro(bucket_width); - int64_t ts_micros = Timestamp::GetEpochMicroSeconds(Cast::template Operation(ts)); - int64_t origin_micros = Timestamp::GetEpochMicroSeconds(Cast::template Operation(origin)); - return Cast::template Operation( + int64_t ts_micros = Timestamp::GetEpochMicroSeconds(ToBucketTimestamp(ts)); + int64_t origin_micros = Timestamp::GetEpochMicroSeconds(ToBucketTimestamp(origin)); + return FromBucketTimestamp( WidthConvertibleToMicrosCommon(bucket_width_micros, ts_micros, origin_micros)); } }; @@ -197,10 +235,9 @@ struct TimeBucket { if (!Value::IsFinite(ts)) { return Cast::template Operation(ts); } - int32_t ts_months = EpochMonths(ts); - int32_t origin_months = EpochMonths(origin); - return Cast::template Operation( - WidthConvertibleToMonthsCommon(bucket_width.months, ts_months, origin_months)); + int32_t ts_months = EpochMonths(ToBucketDate(ts)); + int32_t origin_months = EpochMonths(ToBucketDate(origin)); + return FromBucketDate(WidthConvertibleToMonthsCommon(bucket_width.months, ts_months, origin_months)); } }; @@ -351,20 +388,34 @@ ScalarFunctionSet TimeBucketFun::GetFunctions() { ScalarFunctionSet time_bucket; time_bucket.AddFunction( ScalarFunction({LogicalType::INTERVAL, LogicalType::DATE}, LogicalType::DATE, TimeBucketFunction)); - time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP}, LogicalType::TIMESTAMP, - TimeBucketFunction)); time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::DATE, LogicalType::INTERVAL}, LogicalType::DATE, TimeBucketOffsetFunction)); - time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP, LogicalType::INTERVAL}, - LogicalType::TIMESTAMP, TimeBucketOffsetFunction)); time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::DATE, LogicalType::DATE}, LogicalType::DATE, TimeBucketOriginFunction)); + + time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP}, LogicalType::TIMESTAMP, + TimeBucketFunction)); + time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP, LogicalType::INTERVAL}, + LogicalType::TIMESTAMP, TimeBucketOffsetFunction)); time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP, LogicalType::TIMESTAMP}, LogicalType::TIMESTAMP, TimeBucketOriginFunction)); + for (auto &func : time_bucket.functions) { - func.SetFallible(); func.SetArgProperties(1, ArgProperties().NonDecreasing()); } + + // Not monotonic (wraps) + time_bucket.AddFunction( + ScalarFunction({LogicalType::INTERVAL, LogicalType::TIME}, LogicalType::TIME, TimeBucketFunction)); + time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIME, LogicalType::INTERVAL}, + LogicalType::TIME, TimeBucketOffsetFunction)); + time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIME, LogicalType::TIME}, + LogicalType::TIME, TimeBucketOriginFunction)); + + for (auto &func : time_bucket.functions) { + func.SetFallible(); + } + return time_bucket; } diff --git a/src/duckdb/extension/json/json_functions/read_json.cpp b/src/duckdb/extension/json/json_functions/read_json.cpp index 911143ba6..1c40df066 100644 --- a/src/duckdb/extension/json/json_functions/read_json.cpp +++ b/src/duckdb/extension/json/json_functions/read_json.cpp @@ -160,7 +160,7 @@ void JSONScan::AutoDetect(ClientContext &context, MultiFileBindData &bind_data, bind_data.union_readers.resize(files.empty() ? 0 : files.size()); AutoDetectState auto_detect_state(context, bind_data, files, date_format_map); - const auto num_threads = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + const auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); const auto files_per_task = (file_count + num_threads - 1) / num_threads; const auto num_tasks = (file_count + files_per_task - 1) / files_per_task; vector task_nodes(num_tasks); diff --git a/src/duckdb/src/common/serializer/async_file_writer.cpp b/src/duckdb/src/common/serializer/async_file_writer.cpp index 33ceddd2c..b985ec648 100644 --- a/src/duckdb/src/common/serializer/async_file_writer.cpp +++ b/src/duckdb/src/common/serializer/async_file_writer.cpp @@ -362,7 +362,7 @@ void AsyncFileWriter::Truncate(idx_t size) { handle->Truncate(NumericCast(size)); total_written = size; write_queue->ResetNextOffset(total_written); - if (handle->CanSeek() && handle->SeekPosition() > size) { + if (handle->CanSeek() && handle->SeekPosition() != size) { handle->Seek(size); } } diff --git a/src/duckdb/src/common/serializer/async_write_queue.cpp b/src/duckdb/src/common/serializer/async_write_queue.cpp index 5f5fa20f6..33bde0413 100644 --- a/src/duckdb/src/common/serializer/async_write_queue.cpp +++ b/src/duckdb/src/common/serializer/async_write_queue.cpp @@ -92,7 +92,7 @@ class AsyncWriteQueueTask : public BaseExecutorTask { AsyncWriteQueue::AsyncWriteQueue(ClientContext &client_context_p, AsyncWriteTarget &target_p) : client_context(client_context_p), target(target_p) { auto &scheduler = TaskScheduler::GetScheduler(client_context); - auto async_threads = NumericCast(scheduler.NumberOfAsyncThreads()); + auto async_threads = scheduler.NumberOfAsyncThreads(); max_active_tasks = MaxValue(async_threads, 1); if (async_threads == 0) { return; @@ -497,7 +497,7 @@ idx_t ManagedAsyncWriteQueue::PendingWrite::Size() const { ManagedAsyncWriteQueue::ManagedAsyncWriteQueue(ClientContext &client_context_p, AsyncWriteTarget &target_p) : client_context(client_context_p), target(target_p), memory_governor(client_context_p) { auto &scheduler = TaskScheduler::GetScheduler(client_context); - auto async_threads = NumericCast(scheduler.NumberOfAsyncThreads()); + auto async_threads = scheduler.NumberOfAsyncThreads(); max_active_drain_tasks = MaxValue(async_threads, 1); AsyncWriteTarget &async_target = *this; @@ -925,7 +925,7 @@ ManagedAsyncWriteStreamQueue::ManagedAsyncWriteStreamQueue(ClientContext &client limit_coalesced_write_size = local_file; auto &scheduler = TaskScheduler::GetScheduler(client_context); - auto async_threads = NumericCast(scheduler.NumberOfAsyncThreads()); + auto async_threads = scheduler.NumberOfAsyncThreads(); // Positional writes let multiple async requests drain one logical write queue concurrently. // Otherwise the stream queue keeps one sequential request active so target ordering remains correct. @@ -1155,6 +1155,7 @@ bool ManagedAsyncWriteStreamQueue::TakePendingWriteRequest(AsyncWriteRequest &re return false; } if (drain_mode == DrainMode::SEQUENTIAL && submitted_requests > 0) { + // Non-positional targets write through the file handle's current position, so only one request may be active. return false; } if (policy == SchedulePolicy::THRESHOLD) { diff --git a/src/duckdb/src/common/sort/sort.cpp b/src/duckdb/src/common/sort/sort.cpp index 9ca8fa0fc..58e49cbe0 100644 --- a/src/duckdb/src/common/sort/sort.cpp +++ b/src/duckdb/src/common/sort/sort.cpp @@ -160,7 +160,7 @@ class SortLocalSinkState : public LocalSinkState { class SortGlobalSinkState : public GlobalSinkState { public: explicit SortGlobalSinkState(ClientContext &context) - : num_threads(NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())), + : num_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()), temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)), sorted_tuples(0), external(Settings::Get(context)), any_combined(false), total_count(0), partition_size(0) { diff --git a/src/duckdb/src/common/sort/sorted_run_merger.cpp b/src/duckdb/src/common/sort/sorted_run_merger.cpp index e66e4b576..75363050f 100644 --- a/src/duckdb/src/common/sort/sorted_run_merger.cpp +++ b/src/duckdb/src/common/sort/sorted_run_merger.cpp @@ -162,8 +162,8 @@ class SortedRunMergerLocalState : public LocalSourceState { class SortedRunMergerGlobalState : public GlobalSourceState { public: explicit SortedRunMergerGlobalState(ClientContext &context_p, const SortedRunMerger &merger_p) - : context(context_p), num_threads(NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())), - merger(merger_p), num_runs(merger.sorted_runs.size()), + : context(context_p), num_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()), merger(merger_p), + num_runs(merger.sorted_runs.size()), num_partitions((merger.total_count + (merger.partition_size - 1)) / merger.partition_size), iterator_state_type(GetBlockIteratorStateType(merger.external)), sort_key_type(merger.sort.key_layout->GetSortKeyType()), next_partition_idx(0), total_scanned(0), diff --git a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp index 7afb9a1c6..bf8965b71 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp @@ -641,7 +641,7 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask { void HashAggregateDistinctFinalizeEvent::Schedule() { auto n_tasks = CreateGlobalSources(); - n_tasks = MinValue(n_tasks, NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())); + n_tasks = MinValue(n_tasks, TaskScheduler::GetScheduler(context).NumberOfThreads()); vector> tasks; for (idx_t i = 0; i < n_tasks; i++) { tasks.push_back(make_uniq(*pipeline, shared_from_this(), op, gstate)); diff --git a/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp index a59d46bd0..7f42fb79b 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp @@ -527,7 +527,7 @@ void UngroupedDistinctAggregateFinalizeEvent::Schedule() { global_source_states.push_back(radix_table_p.GetGlobalSourceState(context)); } n_tasks = MaxValue(n_tasks, 1); - n_tasks = MinValue(n_tasks, NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())); + n_tasks = MinValue(n_tasks, TaskScheduler::GetScheduler(context).NumberOfThreads()); vector> tasks; for (idx_t i = 0; i < n_tasks; i++) { diff --git a/src/duckdb/src/execution/operator/aggregate/physical_window.cpp b/src/duckdb/src/execution/operator/aggregate/physical_window.cpp index 956a50d43..31d35e267 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_window.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_window.cpp @@ -595,7 +595,7 @@ void WindowGlobalSourceState::CreateTaskList() { // Schedule the largest group on as many threads as possible auto &ts = TaskScheduler::GetScheduler(client); - const auto threads = NumericCast(ts.NumberOfThreads()); + const auto threads = ts.NumberOfThreads(); const auto &max_block = partition_blocks.front(); diff --git a/src/duckdb/src/execution/operator/join/physical_asof_join.cpp b/src/duckdb/src/execution/operator/join/physical_asof_join.cpp index 1ae267e1d..ac69c62e3 100644 --- a/src/duckdb/src/execution/operator/join/physical_asof_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_asof_join.cpp @@ -693,7 +693,7 @@ void AsOfGlobalSourceState::CreateTaskList(ClientContext &client) { // Schedule the largest group on as many threads as possible auto &ts = TaskScheduler::GetScheduler(client); - const auto threads = NumericCast(ts.NumberOfThreads()); + const auto threads = ts.NumberOfThreads(); const auto per_thread = AsOfHashGroup::BinValue(max_block.first, threads); if (!per_thread) { diff --git a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp index 9c3a019bb..bce94d781 100644 --- a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp @@ -327,8 +327,7 @@ struct LayoutGate { class HashJoinGlobalSinkState : public GlobalSinkState { public: HashJoinGlobalSinkState(const PhysicalHashJoin &op_p, ClientContext &context_p) - : context(context_p), op(op_p), - num_threads(NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())), + : context(context_p), op(op_p), num_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()), temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)), initial_radix_bits(num_threads < 100 ? 4 : 5), finalized(false), active_local_states(0), total_size(0), max_partition_size(0), max_partition_count(0), probe_side_requirement(0), scanned_data(false) { diff --git a/src/duckdb/src/execution/operator/join/physical_iejoin.cpp b/src/duckdb/src/execution/operator/join/physical_iejoin.cpp index 347138baf..a1dbc3080 100644 --- a/src/duckdb/src/execution/operator/join/physical_iejoin.cpp +++ b/src/duckdb/src/execution/operator/join/physical_iejoin.cpp @@ -934,7 +934,7 @@ IEJoinGlobalSourceState::IEJoinGlobalSourceState(const PhysicalIEJoin &op, Clien // Schedule the largest group on as many threads as possible auto &ts = TaskScheduler::GetScheduler(client); - const auto threads = NumericCast(ts.NumberOfThreads()); + const auto threads = ts.NumberOfThreads(); per_thread = BinValue(l2_blocks, threads); Initialize(); diff --git a/src/duckdb/src/execution/operator/join/physical_range_join.cpp b/src/duckdb/src/execution/operator/join/physical_range_join.cpp index 63073c204..74460dfca 100644 --- a/src/duckdb/src/execution/operator/join/physical_range_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_range_join.cpp @@ -216,7 +216,7 @@ class RangeJoinMaterializeEvent : public BasePipelineEvent { // Schedule as many tasks as the sort will allow auto &ts = TaskScheduler::GetScheduler(client); - auto num_threads = NumericCast(ts.NumberOfThreads()); + auto num_threads = ts.NumberOfThreads(); vector> tasks; const auto tasks_scheduled = MinValue(num_threads, table.global_source->MaxThreads()); diff --git a/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp b/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp index 542471a4b..947d62690 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp @@ -302,7 +302,7 @@ class ProcessRemainingBatchesEvent : public BasePipelineEvent { public: void Schedule() override { vector> tasks; - for (idx_t i = 0; i < idx_t(TaskScheduler::GetScheduler(context).NumberOfThreads()); i++) { + for (idx_t i = 0; i < TaskScheduler::GetScheduler(context).NumberOfThreads(); i++) { auto process_task = make_uniq(pipeline->executor, shared_from_this(), gstate, context, op); tasks.push_back(std::move(process_task)); diff --git a/src/duckdb/src/execution/operator/persistent/physical_copy_to_file.cpp b/src/duckdb/src/execution/operator/persistent/physical_copy_to_file.cpp index e2bfdca1f..a9c278467 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_copy_to_file.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_copy_to_file.cpp @@ -311,8 +311,8 @@ class CopyFileLifecycleExecutor { explicit CopyFileLifecycleExecutor(ClientContext &context_p) : context(context_p), executor(context_p, TaskSchedulerType::ASYNC) { auto &scheduler = TaskScheduler::GetScheduler(context); - async_threads = NumericCast(scheduler.NumberOfAsyncThreads()); - auto regular_threads = NumericCast(scheduler.NumberOfThreads()); + async_threads = scheduler.NumberOfAsyncThreads(); + auto regular_threads = scheduler.NumberOfThreads(); max_pending_tasks = MaxValue(MIN_PENDING_TASKS, (async_threads + regular_threads) * 4); } @@ -2353,7 +2353,7 @@ void PartitionedCopyState::CreateTaskList() { auto &ts = TaskScheduler::GetScheduler(partitioned_copy.context); const auto &max_block = partition_blocks.front(); - const auto threads = MinValue(locals, NumericCast(ts.NumberOfThreads())); + const auto threads = MinValue(locals, ts.NumberOfThreads()); const auto aligned_scale = MaxValue(ValidityMask::BITS_PER_VALUE / STANDARD_VECTOR_SIZE, 1); const auto aligned_count = PartitionedCopyHashGroup::BinValue(max_block.first, aligned_scale); const auto per_thread = aligned_scale * PartitionedCopyHashGroup::BinValue(aligned_count, threads); diff --git a/src/duckdb/src/execution/physical_operator.cpp b/src/duckdb/src/execution/physical_operator.cpp index f725ae0cd..6fa6c6395 100644 --- a/src/duckdb/src/execution/physical_operator.cpp +++ b/src/duckdb/src/execution/physical_operator.cpp @@ -84,7 +84,7 @@ bool PhysicalOperator::CanSaturateThreads(ClientContext &context) const { // In debug mode we always return true here so that the code that depends on it is well-tested return true; #else - const auto num_threads = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + const auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); return EstimatedThreadCount() >= num_threads; #endif } @@ -199,7 +199,7 @@ idx_t PhysicalOperator::GetMaxThreadMemory(ClientContext &context) { // Memory usage per thread should scale with max mem / num threads // We take 1/4th of this, to be conservative auto max_memory = BufferManager::GetBufferManager(context).GetOperatorMemoryLimit(); - auto num_threads = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); return (max_memory / num_threads) / 4; } @@ -416,23 +416,28 @@ SelectExecutionMode(const DataChunk &chunk, const OperatorResultType child_resul return CachingPhysicalOperatorExecuteMode::RETURN_CHUNK; } -static bool ChunkHasGlobalDictionary(const DataChunk &chunk) { +//! Empty-id dicts (e.g. slice-of-flat) mint a fresh entry per slice, so their sels must not be accumulated. +static inline bool IsAccumulableDictionary(const Vector &vector) { + return vector.GetVectorType() == VectorType::DICTIONARY_VECTOR && !DictionaryVector::DictionaryId(vector).empty(); +} + +static bool ChunkHasAccumulableDictionary(const DataChunk &chunk) { for (idx_t col_idx = 0; col_idx < chunk.ColumnCount(); col_idx++) { - if (DictionaryVector::IsGlobalDictionary(chunk.data[col_idx])) { + if (IsAccumulableDictionary(chunk.data[col_idx])) { return true; } } return false; } -//! Switch the empty cache to dictionary mode: pin each global dictionary column's upstream entry and allocate its -//! sel accumulator. Columns keep a real (resettable) cache so a flushed dict column flattens on the next Reset. +//! Switch the empty cache to dictionary mode: pin each accumulable dictionary column's upstream entry and allocate +//! its sel accumulator. Columns keep a real (resettable) cache so a flushed dict column flattens on the next Reset. static void SeedDictCache(CachingOperatorState &state, DataChunk &source) { const idx_t col_count = source.ColumnCount(); state.dict_columns.clear(); state.dict_columns.resize(col_count); for (idx_t col_idx = 0; col_idx < col_count; col_idx++) { - if (!DictionaryVector::IsGlobalDictionary(source.data[col_idx])) { + if (!IsAccumulableDictionary(source.data[col_idx])) { continue; } auto &slot = state.dict_columns[col_idx]; @@ -442,15 +447,28 @@ static void SeedDictCache(CachingOperatorState &state, DataChunk &source) { state.dict_cache_active = true; } +//! On identity change, demote just this column to flat (materializing the rows so far) instead of flushing the +//! whole cache, so sibling columns keep accumulating and the chunk still coalesces. +static void FlattenDictColumn(CachingOperatorState &state, DataChunk &cache, idx_t col_idx, idx_t base) { + auto &slot = state.dict_columns[col_idx]; + D_ASSERT(slot.entry); + FlatVector::SetSize(cache.data[col_idx], 0); + if (base > 0) { + // materialize into the cache's own slot so the possibly-shared upstream child is never flattened in place + cache.data[col_idx].Append(slot.entry->data, slot.accumulated_sel, base, VectorAppendMode::ERROR_ON_NO_SPACE); + } + slot.entry = nullptr; +} + //! Append source into the cache (created lazily). On the first append into an empty cache, detect -//! global dictionary columns; those concatenate their selection indices instead of flattening. +//! accumulable dictionary columns; those concatenate their selection indices instead of flattening. static void AppendToCache(CachingOperatorState &state, DataChunk &source, ClientContext &client_context) { if (!state.cached_chunk) { state.cached_chunk = make_uniq(); state.cached_chunk->Initialize(Allocator::Get(client_context), source.GetTypes()); } auto &cache = *state.cached_chunk; - if (cache.size() == 0 && !state.dict_cache_active && ChunkHasGlobalDictionary(source)) { + if (cache.size() == 0 && !state.dict_cache_active && ChunkHasAccumulableDictionary(source)) { SeedDictCache(state, source); } if (!state.dict_cache_active) { @@ -466,28 +484,31 @@ static void AppendToCache(CachingOperatorState &state, DataChunk &source, Client for (idx_t col_idx = 0; col_idx < cache.ColumnCount(); col_idx++) { auto &slot = state.dict_columns[col_idx]; if (slot.entry) { - // dict column: every later chunk must be the same global dictionary. Throw (not D_ASSERT) - // because the Cast below is UB on a non-dict vector in release, accumulating foreign bytes as indices. auto &source_col = source.data[col_idx]; - if (source_col.GetVectorType() != VectorType::DICTIONARY_VECTOR || - DictionaryVector::DictionaryId(source_col).empty() || - !DictionaryVector::IsGlobalDictionary(source_col)) { - throw InternalException("dict-surviving cache: column %llu received a non-global-dictionary " - "chunk after being seeded for dictionary caching", - static_cast(col_idx)); + // Compare the pinned entry by pointer: cheaper than the string id and exact, since pinning keeps the + // address stable (no ABA). The type check must come first to guard the Cast against a flat vector. + const bool same_dict = source_col.GetVectorType() == VectorType::DICTIONARY_VECTOR && + &source_col.Buffer().Cast().GetEntry() == slot.entry.get(); + if (same_dict) { + const auto &source_sel = DictionaryVector::SelVector(source_col); + for (idx_t row = 0; row < added; row++) { + slot.accumulated_sel.set_index(base + row, source_sel.get_index(row)); + } + continue; } - // An id mismatch past the encoding check is a producer bug (never user-triggerable), so assert. - D_ASSERT(source_col.Buffer().Cast().GetEntry().id == slot.entry->id); - const auto &source_sel = DictionaryVector::SelVector(source_col); - for (idx_t row = 0; row < added; row++) { - slot.accumulated_sel.set_index(base + row, source_sel.get_index(row)); + // A global dictionary wraps the same entry for the producer's lifetime, so a change is a producer bug + // that would desync the downstream sink layout -> fail loudly. A non-global dict rotating is normal. + if (slot.entry->global_dictionary) { + throw InternalException("dict-surviving cache: global dictionary column %llu changed identity " + "after being seeded for dictionary caching", + static_cast(col_idx)); } - } else { - // flat column: append per column. The D_ASSERT catches a refactor that routes a dict placeholder here. - D_ASSERT(!slot.entry); - FlatVector::SetSize(cache.data[col_idx], base); - cache.data[col_idx].Append(source.data[col_idx], added, VectorAppendMode::ERROR_ON_NO_SPACE); + FlattenDictColumn(state, cache, col_idx, base); } + // reached by originally-flat and just-demoted columns + D_ASSERT(!slot.entry); + FlatVector::SetSize(cache.data[col_idx], base); + cache.data[col_idx].Append(source.data[col_idx], added, VectorAppendMode::ERROR_ON_NO_SPACE); } // dict columns are rewrapped on flush, flat columns already sized; this only sets the cardinality cache.SetChildCardinality(base + added); diff --git a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp index c9d7ea8e5..41874996c 100644 --- a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp +++ b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp @@ -215,7 +215,7 @@ class RadixHTGlobalSinkState : public GlobalSinkState { RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context_p, const RadixPartitionedHashTable &radix_ht_p) : context(context_p), temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)), finalized(false), external(false), active_threads(0), - number_of_threads(NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads())), + number_of_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()), memory_limit(BufferManager::GetBufferManager(context).GetOperatorMemoryLimit()), block_alloc_size(BufferManager::GetBufferManager(context).GetBlockAllocSize()), any_combined(false), any_abandoned(false), radix_ht(radix_ht_p), config(*this), stored_allocators_size(0), finalize_done(0), @@ -234,7 +234,7 @@ RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context_p, const R auto ht_size = num_partitions * blocks_per_partition * block_alloc_size + config.sink_capacity * sizeof(ht_entry_t); // This really is the minimum reservation that we can do - auto num_threads = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); minimum_reservation = num_threads * ht_size; temporary_memory_state->SetMinimumReservation(minimum_reservation); @@ -742,8 +742,8 @@ idx_t RadixPartitionedHashTable::MaxThreads(GlobalSinkState &sink_p) const { return 0; } - const auto max_threads = MinValue( - NumericCast(TaskScheduler::GetScheduler(sink.context).NumberOfThreads()), sink.partitions.size()); + const auto max_threads = + MinValue(TaskScheduler::GetScheduler(sink.context).NumberOfThreads(), sink.partitions.size()); sink.temporary_memory_state->SetRemainingSizeAndUpdateReservation( sink.context, sink.stored_allocators_size + max_threads * sink.max_partition_size); @@ -933,7 +933,7 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob const auto capacity = GroupedAggregateHashTable::GetCapacityForCount(partition.data->Count()); // However, we will limit the initial capacity so we don't do a huge over-allocation - const auto n_threads = NumericCast(TaskScheduler::GetScheduler(gstate.context).NumberOfThreads()); + const auto n_threads = TaskScheduler::GetScheduler(gstate.context).NumberOfThreads(); const auto memory_limit = BufferManager::GetBufferManager(gstate.context).GetMaxMemory(); const idx_t thread_limit = LossyNumericCast(0.6 * double(memory_limit) / double(n_threads)); diff --git a/src/duckdb/src/function/scalar/string/substring.cpp b/src/duckdb/src/function/scalar/string/substring.cpp index 110cd3fb9..2a271476b 100644 --- a/src/duckdb/src/function/scalar/string/substring.cpp +++ b/src/duckdb/src/function/scalar/string/substring.cpp @@ -139,7 +139,7 @@ string_t SubstringUnicode(Vector &result, string_t input, int64_t offset, int64_ } } } - while (!IsCharacter(input_data[start_pos])) { + while (start_pos < input_size && !IsCharacter(input_data[start_pos])) { start_pos++; } while (end_pos < input_size && !IsCharacter(input_data[end_pos])) { diff --git a/src/duckdb/src/function/table/system/pragma_storage_info.cpp b/src/duckdb/src/function/table/system/pragma_storage_info.cpp index eb54a712a..d6cba1649 100644 --- a/src/duckdb/src/function/table/system/pragma_storage_info.cpp +++ b/src/duckdb/src/function/table/system/pragma_storage_info.cpp @@ -119,7 +119,7 @@ static unique_ptr PragmaStorageInfoBind(ClientContext &context, Ta unique_ptr PragmaStorageInfoInitGlobal(ClientContext &context, TableFunctionInitInput &input) { auto &bind_data = input.bind_data->Cast(); - auto max_threads = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + auto max_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); auto gstate = make_uniq(max_threads); gstate->scan_state.options = bind_data.options; bind_data.table_entry.InitializeColumnSegmentInfoScan(gstate->scan_state); diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 6ebad1c71..32afadfca 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "0-dev9556" +#define DUCKDB_PATCH_VERSION "0-dev9575" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 6 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.6.0-dev9556" +#define DUCKDB_VERSION "v1.6.0-dev9575" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "3518c60315" +#define DUCKDB_SOURCE_ID "8abfd3a6ea" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp b/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp index 0665fca15..37687d26c 100644 --- a/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp +++ b/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp @@ -295,7 +295,7 @@ class MultiFileFunction : public TableFunction { throw InternalException("parallel_lock is not held in TryOpenNextFile, this should not happen"); } - const auto file_lookahead_limit = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + const auto file_lookahead_limit = TaskScheduler::GetScheduler(context).NumberOfThreads(); idx_t file_index = global_state.file_index; idx_t i = 0; @@ -583,7 +583,7 @@ class MultiFileFunction : public TableFunction { result->filters = input.filters.get(); result->op = input.op; result->global_state = bind_data.interface->InitializeGlobalState(context, bind_data, *result); - result->max_threads = NumericCast(TaskScheduler::GetScheduler(context).NumberOfThreads()); + result->max_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); // Ensure all readers are initialized and FileListScan is sync with readers list for (auto &reader_data : result->readers) { diff --git a/src/duckdb/src/include/duckdb/common/serializer/async_write_queue.hpp b/src/duckdb/src/include/duckdb/common/serializer/async_write_queue.hpp index 0350cbdec..6bc426030 100644 --- a/src/duckdb/src/include/duckdb/common/serializer/async_write_queue.hpp +++ b/src/duckdb/src/include/duckdb/common/serializer/async_write_queue.hpp @@ -327,7 +327,7 @@ class ManagedAsyncWriteStreamQueue : private AsyncWriteTarget { enum class ScheduleMode : uint8_t { ALLOW, DEFER }; //! Whether to schedule only enough request capacity for normal overlap, or force all pending bytes to drain. enum class SchedulePolicy : uint8_t { THRESHOLD, FORCE }; - //! Whether async requests can write independent target ranges concurrently. + //! Whether async requests can write independent target ranges or must use the target's current stream position. enum class DrainMode : uint8_t { SEQUENTIAL, POSITIONAL }; //! Whether waiting for scheduled writes should preserve an open registration batch. enum class BatchDrainMode : uint8_t { PRESERVE_BATCH, FORCE_CLOSE_BATCH }; @@ -415,14 +415,14 @@ class ManagedAsyncWriteStreamQueue : private AsyncWriteTarget { //! Discard queued writes after an async write failure once all submitted writes have stopped. void CancelPendingWritesAfterFailure() noexcept; - //! Write bytes to the managed target at the assigned logical offset. + //! Write bytes to the managed target. SEQUENTIAL mode preserves queue order and ignores the logical offset here. void Write(data_ptr_t buffer, idx_t size, idx_t offset) override; private: ClientContext &client_context; ManagedAsyncWriteStreamTarget ⌖ - //! Positional managed queue that owns TMM reservation, backpressure, and task scheduling. + //! Managed queue that owns TMM reservation, backpressure, and task scheduling for physical write requests. unique_ptr write_queue; //! Whether async requests may drain independent ranges concurrently using positional writes. DrainMode drain_mode = DrainMode::SEQUENTIAL; diff --git a/src/duckdb/src/include/duckdb/execution/physical_operator.hpp b/src/duckdb/src/include/duckdb/execution/physical_operator.hpp index c2f42825c..76752c47f 100644 --- a/src/duckdb/src/include/duckdb/execution/physical_operator.hpp +++ b/src/duckdb/src/include/duckdb/execution/physical_operator.hpp @@ -247,8 +247,7 @@ class PhysicalOperator { } }; -//! A cached column that arrived as a global dictionary: the pinned upstream entry is kept and -//! per-chunk selection indices concatenated, so the dictionary survives the cache instead of flattening +//! Accumulator that lets a dictionary column survive the cache: pinned entry + concatenated per-chunk sels struct CachedDictColumn { buffer_ptr entry; SelectionVector accumulated_sel; @@ -285,8 +284,8 @@ class CachingOperatorState : public OperatorState { bool must_return_continuation_chunk = false; OperatorResultType cached_result; - //! One slot per cached column. Invariant: entry != null iff the column is accumulating a global - //! dictionary; entry == null iff plain flat caching (the common case) + //! One slot per cached column. Invariant: entry != null iff the column is accumulating a dictionary + //! (pinned by entry pointer identity); entry == null iff plain flat caching (the common case) vector dict_columns; bool dict_cache_active = false; }; diff --git a/src/duckdb/src/include/duckdb/main/extension_entries.hpp b/src/duckdb/src/include/duckdb/main/extension_entries.hpp index fce288f20..4e22f3f8a 100644 --- a/src/duckdb/src/include/duckdb/main/extension_entries.hpp +++ b/src/duckdb/src/include/duckdb/main/extension_entries.hpp @@ -1027,6 +1027,9 @@ static constexpr ExtensionFunctionOverloadEntry EXTENSION_FUNCTION_OVERLOADS[] = {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,DATE]>DATE"}, {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,DATE,DATE]>DATE"}, {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,DATE,INTERVAL]>DATE"}, + {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,TIME]>TIME"}, + {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,TIME,INTERVAL]>TIME"}, + {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,TIME,TIME]>TIME"}, {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,TIMESTAMP]>TIMESTAMP"}, {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,TIMESTAMP,INTERVAL]>TIMESTAMP"}, {"time_bucket", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY, "[INTERVAL,TIMESTAMP,TIMESTAMP]>TIMESTAMP"}, diff --git a/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp b/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp index 2de1f7cbb..573aa00ff 100644 --- a/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp +++ b/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp @@ -56,9 +56,9 @@ class TaskScheduler { unique_ptr CreateProducer(); //! Returns the number of threads - DUCKDB_API int32_t NumberOfThreads(); + DUCKDB_API idx_t NumberOfThreads(); //! Returns the number of async threads - DUCKDB_API int32_t NumberOfAsyncThreads(); + DUCKDB_API idx_t NumberOfAsyncThreads(); idx_t GetNumberOfTasks() const; idx_t GetProducerCount() const; diff --git a/src/duckdb/src/include/duckdb/parallel/task_scheduler_pool.hpp b/src/duckdb/src/include/duckdb/parallel/task_scheduler_pool.hpp index d3d679761..ffa37a3ee 100644 --- a/src/duckdb/src/include/duckdb/parallel/task_scheduler_pool.hpp +++ b/src/duckdb/src/include/duckdb/parallel/task_scheduler_pool.hpp @@ -28,7 +28,7 @@ class TaskSchedulerPool { public: void SetThreads(idx_t n); - int32_t NumberOfThreads(); + idx_t NumberOfThreads(); void RelaunchThreads(TaskScheduler &scheduler, bool destroy); void Signal(idx_t n); #ifndef DUCKDB_NO_THREADS @@ -45,9 +45,9 @@ class TaskSchedulerPool { //! Markers used by the various threads, if the markers are set to "false" the thread execution is stopped vector>> markers; //! Requested thread count (set by the 'threads' setting) - atomic requested_thread_count; + atomic requested_thread_count; //! The amount of threads currently running - atomic current_thread_count; + atomic current_thread_count; #ifndef DUCKDB_NO_THREADS //! Semaphore to signal threads in this pool to wake up and execute a task unique_ptr semaphore; diff --git a/src/duckdb/src/main/database.cpp b/src/duckdb/src/main/database.cpp index 89b59f868..537fd522d 100644 --- a/src/duckdb/src/main/database.cpp +++ b/src/duckdb/src/main/database.cpp @@ -555,7 +555,7 @@ const DBConfig &DBConfig::GetConfig(const ClientContext &context) { } idx_t DatabaseInstance::NumberOfThreads() { - return NumericCast(scheduler->NumberOfThreads()); + return scheduler->NumberOfThreads(); } idx_t DuckDB::NumberOfThreads() { diff --git a/src/duckdb/src/optimizer/partitioned_execution.cpp b/src/duckdb/src/optimizer/partitioned_execution.cpp index d05f91078..4440a7ab8 100644 --- a/src/duckdb/src/optimizer/partitioned_execution.cpp +++ b/src/duckdb/src/optimizer/partitioned_execution.cpp @@ -15,7 +15,7 @@ namespace duckdb { PartitionedExecution::PartitionedExecution(Optimizer &optimizer_p, unique_ptr &root_p) : optimizer(optimizer_p), root(root_p), - num_threads(NumericCast(TaskScheduler::GetScheduler(optimizer.context).NumberOfThreads())) { + num_threads(TaskScheduler::GetScheduler(optimizer.context).NumberOfThreads()) { } struct PartitionedExecutionConfig { diff --git a/src/duckdb/src/parallel/meta_pipeline.cpp b/src/duckdb/src/parallel/meta_pipeline.cpp index c61976721..b21b84bc6 100644 --- a/src/duckdb/src/parallel/meta_pipeline.cpp +++ b/src/duckdb/src/parallel/meta_pipeline.cpp @@ -176,7 +176,7 @@ void MetaPipeline::AddRecursiveDependencies(const vector> & // by only adding the dependencies if the source operator can likely keep all threads busy. // when 'force' is true (e.g. for DML CTEs), we always add the dependencies regardless, // because the ordering is required for correctness, not just performance. - const auto thread_count = NumericCast(TaskScheduler::GetScheduler(executor.context).NumberOfThreads()); + const auto thread_count = TaskScheduler::GetScheduler(executor.context).NumberOfThreads(); for (; it != child_meta_pipelines.end(); it++) { for (auto &pipeline : it->get()->pipelines) { if (!force && !PipelineExceedsThreadCount(*pipeline, thread_count)) { diff --git a/src/duckdb/src/parallel/pipeline.cpp b/src/duckdb/src/parallel/pipeline.cpp index ed251c0be..338476fbd 100644 --- a/src/duckdb/src/parallel/pipeline.cpp +++ b/src/duckdb/src/parallel/pipeline.cpp @@ -119,7 +119,7 @@ bool Pipeline::TryGetMaxThreads(idx_t &max_threads) { } auto &scheduler = TaskScheduler::GetScheduler(executor.context); - auto active_threads = NumericCast(scheduler.NumberOfThreads()); + auto active_threads = scheduler.NumberOfThreads(); if (max_threads > active_threads) { max_threads = active_threads; } diff --git a/src/duckdb/src/parallel/task_scheduler.cpp b/src/duckdb/src/parallel/task_scheduler.cpp index 29537e8e2..cd4256ef2 100644 --- a/src/duckdb/src/parallel/task_scheduler.cpp +++ b/src/duckdb/src/parallel/task_scheduler.cpp @@ -170,7 +170,7 @@ void TaskScheduler::ExecuteForever(atomic *marker, const TaskSchedulerType block_allocator.ThreadFlush( Settings::Get(db), StringUtil::ParseFormattedBytes(Settings::Get(db)), - NumericCast(GetPool(TaskSchedulerType::REGULAR).NumberOfThreads())); + GetPool(TaskSchedulerType::REGULAR).NumberOfThreads()); auto decay_delay = Allocator::DecayDelay(); if (!decay_delay.IsValid()) { // no decay delay specified - just wait @@ -199,7 +199,7 @@ void TaskScheduler::ExecuteForever(atomic *marker, const TaskSchedulerType // this thread will exit, flush all of its outstanding allocations if (block_allocator.SupportsFlush()) { block_allocator.ThreadFlush(Settings::Get(db), 0, - NumericCast(GetPool(TaskSchedulerType::REGULAR).NumberOfThreads())); + GetPool(TaskSchedulerType::REGULAR).NumberOfThreads()); Allocator::ThreadIdle(); } #else @@ -269,11 +269,11 @@ void TaskScheduler::ExecuteTasks(idx_t max_tasks) { #endif } -int32_t TaskScheduler::NumberOfThreads() { +idx_t TaskScheduler::NumberOfThreads() { return GetPool(TaskSchedulerType::REGULAR).NumberOfThreads(); } -int32_t TaskScheduler::NumberOfAsyncThreads() { +idx_t TaskScheduler::NumberOfAsyncThreads() { return GetPool(TaskSchedulerType::ASYNC).NumberOfThreads(); } diff --git a/src/duckdb/src/parallel/task_scheduler_pool.cpp b/src/duckdb/src/parallel/task_scheduler_pool.cpp index 20e2c0af1..9775b5f6f 100644 --- a/src/duckdb/src/parallel/task_scheduler_pool.cpp +++ b/src/duckdb/src/parallel/task_scheduler_pool.cpp @@ -54,10 +54,10 @@ TaskSchedulerPool::~TaskSchedulerPool() { } void TaskSchedulerPool::SetThreads(idx_t n) { - requested_thread_count = NumericCast(n); + requested_thread_count = n; } -int32_t TaskSchedulerPool::NumberOfThreads() { +idx_t TaskSchedulerPool::NumberOfThreads() { return current_thread_count.load(); } @@ -126,7 +126,7 @@ static void ThreadExecuteTasks(TaskScheduler *scheduler, atomic *marker, c void TaskSchedulerPool::RelaunchThreads(TaskScheduler &scheduler, bool destroy) { #ifndef DUCKDB_NO_THREADS auto &config = DBConfig::GetConfig(db); - auto new_thread_count = NumericCast(destroy ? 0 : requested_thread_count.load()); + auto new_thread_count = destroy ? 0 : requested_thread_count.load(); idx_t external_threads = 0; ThreadPinMode pin_thread_mode = ThreadPinMode::AUTO; @@ -137,8 +137,7 @@ void TaskSchedulerPool::RelaunchThreads(TaskScheduler &scheduler, bool destroy) } if (threads.size() == new_thread_count) { - current_thread_count = - NumericCast(threads.size() + (pool_type == TaskSchedulerType::REGULAR ? external_threads : 0)); + current_thread_count = threads.size() + (pool_type == TaskSchedulerType::REGULAR ? external_threads : 0); return; } if (threads.size() != new_thread_count) { @@ -190,8 +189,7 @@ void TaskSchedulerPool::RelaunchThreads(TaskScheduler &scheduler, bool destroy) markers.push_back(std::move(marker)); } } - current_thread_count = - NumericCast(threads.size() + (pool_type == TaskSchedulerType::REGULAR ? external_threads : 0)); + current_thread_count = threads.size() + (pool_type == TaskSchedulerType::REGULAR ? external_threads : 0); BlockAllocator::Get(db).FlushAll(); #endif } diff --git a/src/duckdb/src/storage/temporary_memory_manager.cpp b/src/duckdb/src/storage/temporary_memory_manager.cpp index a0221c453..701fc828f 100644 --- a/src/duckdb/src/storage/temporary_memory_manager.cpp +++ b/src/duckdb/src/storage/temporary_memory_manager.cpp @@ -101,7 +101,7 @@ void TemporaryMemoryManager::UpdateConfiguration(ClientContext &context) { memory_limit = LossyNumericCast(MAXIMUM_MEMORY_LIMIT_RATIO * static_cast(buffer_manager.GetMaxMemory())); has_temporary_directory = buffer_manager.HasTemporaryDirectory(); - num_threads = NumericCast(task_scheduler.NumberOfThreads()); + num_threads = task_scheduler.NumberOfThreads(); num_connections = ConnectionManager::Get(context).GetConnectionCount(); query_max_memory = buffer_manager.GetOperatorMemoryLimit(); }