Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 76 additions & 25 deletions src/duckdb/extension/core_functions/scalar/date/time_bucket.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "duckdb/common/exception.hpp"
#include "duckdb/common/limits.hpp"
#include "duckdb/common/operator/cast_operators.hpp"
#include "duckdb/common/operator/subtract.hpp"
#include "duckdb/common/types/date.hpp"
Expand All @@ -8,13 +7,52 @@
#include "duckdb/common/types/value.hpp"
#include "duckdb/common/vector_operations/binary_executor.hpp"
#include "duckdb/common/vector_operations/ternary_executor.hpp"
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "core_functions/scalar/date_functions.hpp"

namespace duckdb {

namespace {

template <typename T>
timestamp_t ToBucketTimestamp(const T &t) {
return Cast::template Operation<T, timestamp_t>(t);
}

template <>
timestamp_t ToBucketTimestamp(const dtime_t &t) {
return Timestamp::FromDatetime(date_t::epoch(), t);
}

template <typename T>
date_t ToBucketDate(const T &t) {
return Cast::template Operation<T, date_t>(t);
}

template <>
date_t ToBucketDate(const dtime_t &t) {
return date_t::epoch();
}

template <typename T>
T FromBucketTimestamp(const timestamp_t &t) {
return Cast::template Operation<timestamp_t, T>(t);
}

template <>
dtime_t FromBucketTimestamp(const timestamp_t &t) {
return Timestamp::GetTime(t);
}

template <typename T>
T FromBucketDate(const date_t &t) {
return Cast::template Operation<date_t, T>(t);
}

template <>
dtime_t FromBucketDate(const date_t &t) {
return dtime_t(0);
}

struct TimeBucket {
// Use 2000-01-03 00:00:00 (Monday) as origin when bucket_width is days, hours, ... for TimescaleDB compatibility
// There are 10959 days between 1970-01-01 and 2000-01-03
Expand Down Expand Up @@ -100,8 +138,8 @@ struct TimeBucket {
return Cast::template Operation<TB, TR>(ts);
}
int64_t bucket_width_micros = Interval::GetMicro(bucket_width);
int64_t ts_micros = Timestamp::GetEpochMicroSeconds(Cast::template Operation<TB, timestamp_t>(ts));
return Cast::template Operation<timestamp_t, TR>(
int64_t ts_micros = Timestamp::GetEpochMicroSeconds(ToBucketTimestamp<TB>(ts));
return FromBucketTimestamp<TR>(
WidthConvertibleToMicrosCommon(bucket_width_micros, ts_micros, DEFAULT_ORIGIN_MICROS));
}
};
Expand All @@ -112,8 +150,8 @@ struct TimeBucket {
if (!Value::IsFinite(ts)) {
return Cast::template Operation<TB, TR>(ts);
}
int32_t ts_months = EpochMonths(ts);
return Cast::template Operation<date_t, TR>(
int32_t ts_months = EpochMonths(ToBucketDate(ts));
return FromBucketDate<TR>(
WidthConvertibleToMonthsCommon(bucket_width.months, ts_months, DEFAULT_ORIGIN_MONTHS));
}
};
Expand All @@ -140,9 +178,9 @@ struct TimeBucket {
return Cast::template Operation<TB, TR>(ts);
}
int64_t bucket_width_micros = Interval::GetMicro(bucket_width);
int64_t ts_micros = Timestamp::GetEpochMicroSeconds(
Interval::Add(Cast::template Operation<TB, timestamp_t>(ts), Interval::Invert(offset)));
return Cast::template Operation<timestamp_t, TR>(Interval::Add(
int64_t ts_micros =
Timestamp::GetEpochMicroSeconds(Interval::Add(ToBucketTimestamp(ts), Interval::Invert(offset)));
return FromBucketTimestamp<TR>(Interval::Add(
WidthConvertibleToMicrosCommon(bucket_width_micros, ts_micros, DEFAULT_ORIGIN_MICROS), offset));
}
};
Expand All @@ -153,10 +191,10 @@ struct TimeBucket {
if (!Value::IsFinite(ts)) {
return Cast::template Operation<TB, TR>(ts);
}
int32_t ts_months = EpochMonths(Interval::Add(ts, Interval::Invert(offset)));
return Interval::Add(Cast::template Operation<date_t, TR>(WidthConvertibleToMonthsCommon(
bucket_width.months, ts_months, DEFAULT_ORIGIN_MONTHS)),
offset);
int32_t ts_months = EpochMonths(Interval::Add(ToBucketTimestamp(ts), Interval::Invert(offset)));
return FromBucketTimestamp<TR>(Interval::Add(ToBucketTimestamp(WidthConvertibleToMonthsCommon(
bucket_width.months, ts_months, DEFAULT_ORIGIN_MONTHS)),
offset));
}
};

Expand Down Expand Up @@ -184,9 +222,9 @@ struct TimeBucket {
return Cast::template Operation<TB, TR>(ts);
}
int64_t bucket_width_micros = Interval::GetMicro(bucket_width);
int64_t ts_micros = Timestamp::GetEpochMicroSeconds(Cast::template Operation<TB, timestamp_t>(ts));
int64_t origin_micros = Timestamp::GetEpochMicroSeconds(Cast::template Operation<TB, timestamp_t>(origin));
return Cast::template Operation<timestamp_t, TR>(
int64_t ts_micros = Timestamp::GetEpochMicroSeconds(ToBucketTimestamp(ts));
int64_t origin_micros = Timestamp::GetEpochMicroSeconds(ToBucketTimestamp(origin));
return FromBucketTimestamp<TR>(
WidthConvertibleToMicrosCommon(bucket_width_micros, ts_micros, origin_micros));
}
};
Expand All @@ -197,10 +235,9 @@ struct TimeBucket {
if (!Value::IsFinite(ts)) {
return Cast::template Operation<TB, TR>(ts);
}
int32_t ts_months = EpochMonths(ts);
int32_t origin_months = EpochMonths(origin);
return Cast::template Operation<date_t, TR>(
WidthConvertibleToMonthsCommon(bucket_width.months, ts_months, origin_months));
int32_t ts_months = EpochMonths(ToBucketDate(ts));
int32_t origin_months = EpochMonths(ToBucketDate(origin));
return FromBucketDate<TR>(WidthConvertibleToMonthsCommon(bucket_width.months, ts_months, origin_months));
}
};

Expand Down Expand Up @@ -351,20 +388,34 @@ ScalarFunctionSet TimeBucketFun::GetFunctions() {
ScalarFunctionSet time_bucket;
time_bucket.AddFunction(
ScalarFunction({LogicalType::INTERVAL, LogicalType::DATE}, LogicalType::DATE, TimeBucketFunction<date_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP}, LogicalType::TIMESTAMP,
TimeBucketFunction<timestamp_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::DATE, LogicalType::INTERVAL},
LogicalType::DATE, TimeBucketOffsetFunction<date_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP, LogicalType::INTERVAL},
LogicalType::TIMESTAMP, TimeBucketOffsetFunction<timestamp_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::DATE, LogicalType::DATE},
LogicalType::DATE, TimeBucketOriginFunction<date_t>));

time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP}, LogicalType::TIMESTAMP,
TimeBucketFunction<timestamp_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP, LogicalType::INTERVAL},
LogicalType::TIMESTAMP, TimeBucketOffsetFunction<timestamp_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIMESTAMP, LogicalType::TIMESTAMP},
LogicalType::TIMESTAMP, TimeBucketOriginFunction<timestamp_t>));

for (auto &func : time_bucket.functions) {
func.SetFallible();
func.SetArgProperties(1, ArgProperties().NonDecreasing());
}

// Not monotonic (wraps)
time_bucket.AddFunction(
ScalarFunction({LogicalType::INTERVAL, LogicalType::TIME}, LogicalType::TIME, TimeBucketFunction<dtime_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIME, LogicalType::INTERVAL},
LogicalType::TIME, TimeBucketOffsetFunction<dtime_t>));
time_bucket.AddFunction(ScalarFunction({LogicalType::INTERVAL, LogicalType::TIME, LogicalType::TIME},
LogicalType::TIME, TimeBucketOriginFunction<dtime_t>));

for (auto &func : time_bucket.functions) {
func.SetFallible();
}

return time_bucket;
}

Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/extension/json/json_functions/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void JSONScan::AutoDetect(ClientContext &context, MultiFileBindData &bind_data,
bind_data.union_readers.resize(files.empty() ? 0 : files.size());

AutoDetectState auto_detect_state(context, bind_data, files, date_format_map);
const auto num_threads = NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads());
const auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
const auto files_per_task = (file_count + num_threads - 1) / num_threads;
const auto num_tasks = (file_count + files_per_task - 1) / files_per_task;
vector<JSONStructureNode> task_nodes(num_tasks);
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/serializer/async_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ void AsyncFileWriter::Truncate(idx_t size) {
handle->Truncate(NumericCast<int64_t>(size));
total_written = size;
write_queue->ResetNextOffset(total_written);
if (handle->CanSeek() && handle->SeekPosition() > size) {
if (handle->CanSeek() && handle->SeekPosition() != size) {
handle->Seek(size);
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/src/common/serializer/async_write_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class AsyncWriteQueueTask : public BaseExecutorTask {
AsyncWriteQueue::AsyncWriteQueue(ClientContext &client_context_p, AsyncWriteTarget &target_p)
: client_context(client_context_p), target(target_p) {
auto &scheduler = TaskScheduler::GetScheduler(client_context);
auto async_threads = NumericCast<idx_t>(scheduler.NumberOfAsyncThreads());
auto async_threads = scheduler.NumberOfAsyncThreads();
max_active_tasks = MaxValue<idx_t>(async_threads, 1);
if (async_threads == 0) {
return;
Expand Down Expand Up @@ -497,7 +497,7 @@ idx_t ManagedAsyncWriteQueue::PendingWrite::Size() const {
ManagedAsyncWriteQueue::ManagedAsyncWriteQueue(ClientContext &client_context_p, AsyncWriteTarget &target_p)
: client_context(client_context_p), target(target_p), memory_governor(client_context_p) {
auto &scheduler = TaskScheduler::GetScheduler(client_context);
auto async_threads = NumericCast<idx_t>(scheduler.NumberOfAsyncThreads());
auto async_threads = scheduler.NumberOfAsyncThreads();
max_active_drain_tasks = MaxValue<idx_t>(async_threads, 1);

AsyncWriteTarget &async_target = *this;
Expand Down Expand Up @@ -925,7 +925,7 @@ ManagedAsyncWriteStreamQueue::ManagedAsyncWriteStreamQueue(ClientContext &client
limit_coalesced_write_size = local_file;

auto &scheduler = TaskScheduler::GetScheduler(client_context);
auto async_threads = NumericCast<idx_t>(scheduler.NumberOfAsyncThreads());
auto async_threads = scheduler.NumberOfAsyncThreads();

// Positional writes let multiple async requests drain one logical write queue concurrently.
// Otherwise the stream queue keeps one sequential request active so target ordering remains correct.
Expand Down Expand Up @@ -1155,6 +1155,7 @@ bool ManagedAsyncWriteStreamQueue::TakePendingWriteRequest(AsyncWriteRequest &re
return false;
}
if (drain_mode == DrainMode::SEQUENTIAL && submitted_requests > 0) {
// Non-positional targets write through the file handle's current position, so only one request may be active.
return false;
}
if (policy == SchedulePolicy::THRESHOLD) {
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/sort/sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class SortLocalSinkState : public LocalSinkState {
class SortGlobalSinkState : public GlobalSinkState {
public:
explicit SortGlobalSinkState(ClientContext &context)
: num_threads(NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads())),
: num_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()),
temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)), sorted_tuples(0),
external(Settings::Get<DebugForceExternalSetting>(context)), any_combined(false), total_count(0),
partition_size(0) {
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/src/common/sort/sorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ class SortedRunMergerLocalState : public LocalSourceState {
class SortedRunMergerGlobalState : public GlobalSourceState {
public:
explicit SortedRunMergerGlobalState(ClientContext &context_p, const SortedRunMerger &merger_p)
: context(context_p), num_threads(NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads())),
merger(merger_p), num_runs(merger.sorted_runs.size()),
: context(context_p), num_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()), merger(merger_p),
num_runs(merger.sorted_runs.size()),
num_partitions((merger.total_count + (merger.partition_size - 1)) / merger.partition_size),
iterator_state_type(GetBlockIteratorStateType(merger.external)),
sort_key_type(merger.sort.key_layout->GetSortKeyType()), next_partition_idx(0), total_scanned(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask {

void HashAggregateDistinctFinalizeEvent::Schedule() {
auto n_tasks = CreateGlobalSources();
n_tasks = MinValue<idx_t>(n_tasks, NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads()));
n_tasks = MinValue<idx_t>(n_tasks, TaskScheduler::GetScheduler(context).NumberOfThreads());
vector<shared_ptr<Task>> tasks;
for (idx_t i = 0; i < n_tasks; i++) {
tasks.push_back(make_uniq<HashAggregateDistinctFinalizeTask>(*pipeline, shared_from_this(), op, gstate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ void UngroupedDistinctAggregateFinalizeEvent::Schedule() {
global_source_states.push_back(radix_table_p.GetGlobalSourceState(context));
}
n_tasks = MaxValue<idx_t>(n_tasks, 1);
n_tasks = MinValue<idx_t>(n_tasks, NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads()));
n_tasks = MinValue<idx_t>(n_tasks, TaskScheduler::GetScheduler(context).NumberOfThreads());

vector<shared_ptr<Task>> tasks;
for (idx_t i = 0; i < n_tasks; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ void WindowGlobalSourceState::CreateTaskList() {

// Schedule the largest group on as many threads as possible
auto &ts = TaskScheduler::GetScheduler(client);
const auto threads = NumericCast<idx_t>(ts.NumberOfThreads());
const auto threads = ts.NumberOfThreads();

const auto &max_block = partition_blocks.front();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ void AsOfGlobalSourceState::CreateTaskList(ClientContext &client) {

// Schedule the largest group on as many threads as possible
auto &ts = TaskScheduler::GetScheduler(client);
const auto threads = NumericCast<idx_t>(ts.NumberOfThreads());
const auto threads = ts.NumberOfThreads();

const auto per_thread = AsOfHashGroup::BinValue(max_block.first, threads);
if (!per_thread) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ struct LayoutGate {
class HashJoinGlobalSinkState : public GlobalSinkState {
public:
HashJoinGlobalSinkState(const PhysicalHashJoin &op_p, ClientContext &context_p)
: context(context_p), op(op_p),
num_threads(NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads())),
: context(context_p), op(op_p), num_threads(TaskScheduler::GetScheduler(context).NumberOfThreads()),
temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)),
initial_radix_bits(num_threads < 100 ? 4 : 5), finalized(false), active_local_states(0), total_size(0),
max_partition_size(0), max_partition_count(0), probe_side_requirement(0), scanned_data(false) {
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/execution/operator/join/physical_iejoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ IEJoinGlobalSourceState::IEJoinGlobalSourceState(const PhysicalIEJoin &op, Clien

// Schedule the largest group on as many threads as possible
auto &ts = TaskScheduler::GetScheduler(client);
const auto threads = NumericCast<idx_t>(ts.NumberOfThreads());
const auto threads = ts.NumberOfThreads();
per_thread = BinValue<idx_t>(l2_blocks, threads);

Initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class RangeJoinMaterializeEvent : public BasePipelineEvent {

// Schedule as many tasks as the sort will allow
auto &ts = TaskScheduler::GetScheduler(client);
auto num_threads = NumericCast<idx_t>(ts.NumberOfThreads());
auto num_threads = ts.NumberOfThreads();
vector<shared_ptr<Task>> tasks;

const auto tasks_scheduled = MinValue<idx_t>(num_threads, table.global_source->MaxThreads());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class ProcessRemainingBatchesEvent : public BasePipelineEvent {
public:
void Schedule() override {
vector<shared_ptr<Task>> tasks;
for (idx_t i = 0; i < idx_t(TaskScheduler::GetScheduler(context).NumberOfThreads()); i++) {
for (idx_t i = 0; i < TaskScheduler::GetScheduler(context).NumberOfThreads(); i++) {
auto process_task =
make_uniq<ProcessRemainingBatchesTask>(pipeline->executor, shared_from_this(), gstate, context, op);
tasks.push_back(std::move(process_task));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ class CopyFileLifecycleExecutor {
explicit CopyFileLifecycleExecutor(ClientContext &context_p)
: context(context_p), executor(context_p, TaskSchedulerType::ASYNC) {
auto &scheduler = TaskScheduler::GetScheduler(context);
async_threads = NumericCast<idx_t>(scheduler.NumberOfAsyncThreads());
auto regular_threads = NumericCast<idx_t>(scheduler.NumberOfThreads());
async_threads = scheduler.NumberOfAsyncThreads();
auto regular_threads = scheduler.NumberOfThreads();
max_pending_tasks = MaxValue<idx_t>(MIN_PENDING_TASKS, (async_threads + regular_threads) * 4);
}

Expand Down Expand Up @@ -2353,7 +2353,7 @@ void PartitionedCopyState::CreateTaskList() {
auto &ts = TaskScheduler::GetScheduler(partitioned_copy.context);
const auto &max_block = partition_blocks.front();

const auto threads = MinValue<idx_t>(locals, NumericCast<idx_t>(ts.NumberOfThreads()));
const auto threads = MinValue<idx_t>(locals, ts.NumberOfThreads());
const auto aligned_scale = MaxValue<idx_t>(ValidityMask::BITS_PER_VALUE / STANDARD_VECTOR_SIZE, 1);
const auto aligned_count = PartitionedCopyHashGroup::BinValue(max_block.first, aligned_scale);
const auto per_thread = aligned_scale * PartitionedCopyHashGroup::BinValue(aligned_count, threads);
Expand Down
Loading
Loading