diff --git a/elvis.config b/elvis.config index 5221c39..8c66880 100644 --- a/elvis.config +++ b/elvis.config @@ -13,7 +13,7 @@ {elvis_style, macro_module_names}, {elvis_style, operator_spaces, #{rules => [{right, ","}, {right, "++"}, {left, "++"}]}}, {elvis_style, nesting_level, #{level => 4}}, - {elvis_style, no_if_expression}, + {elvis_style, no_if_expression, #{ignore => [prg_pg_backend, prg_utils]}}, %% FIXME Implement appropriate behaviours {elvis_style, invalid_dynamic_call, #{ ignore => [prg_storage, prg_worker_sidecar] diff --git a/include/progressor.hrl b/include/progressor.hrl index 5edb6b9..4a1dbd8 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -25,7 +25,7 @@ last_event_id => event_id(), initialization => task_id(), previous_status => process_status(), - status_changed_at => timestamp_sec() + status_changed_at => timestamp_us() }. -type task() :: #{ @@ -33,9 +33,9 @@ process_id := id(), task_type := task_type(), status := task_status(), - scheduled_time := timestamp_sec(), - running_time => timestamp_sec(), - finished_time => timestamp_sec(), + scheduled_time := timestamp_us(), + running_time => timestamp_us(), + finished_time => timestamp_us(), args => binary(), metadata => map(), idempotency_key => binary(), @@ -59,11 +59,12 @@ task_id := task_id(), task_type := task_type(), task_status := task_status(), - scheduled := timestamp_sec(), - running => timestamp_sec(), - finished => timestamp_sec(), + scheduled := timestamp_us(), + running => timestamp_us(), + finished => timestamp_us(), args => binary(), metadata => map(), + context => binary(), idempotency_key => binary(), response => term(), retry_interval => non_neg_integer(), @@ -190,10 +191,13 @@ -type task_result() :: #{ task_id := task_id(), status := task_status(), + running_time => timestamp_sec(), finished_time => timestamp_sec(), response => binary() }. +%% microsecond +-type timestamp_us() :: non_neg_integer(). -type timestamp_ms() :: non_neg_integer(). -type timestamp_sec() :: non_neg_integer(). -type timeout_sec() :: non_neg_integer(). @@ -221,6 +225,6 @@ process_id => ProcessId, status => <<"init">>, previous_status => <<"init">>, - created_at => erlang:system_time(second), - status_changed_at => erlang:system_time(second) + created_at => erlang:system_time(microsecond), + status_changed_at => erlang:system_time(microsecond) }). diff --git a/src/prg_utils.erl b/src/prg_utils.erl index 1d0867f..87eff52 100644 --- a/src/prg_utils.erl +++ b/src/prg_utils.erl @@ -9,6 +9,8 @@ -export([make_ns_opts/2]). -export([unixtime_to_datetime/1]). -export([with_observe/4]). +-export([to_microseconds/1]). +-export([to_seconds/1]). -export([with_observe/5]). -export([with_span/2]). @@ -91,3 +93,35 @@ collect(histogram, MetricKey, Labels, Value) -> %%collect(_, _MetricKey, _Labels, _Value) -> %% %% TODO implement it %% ok. + +-spec to_microseconds(non_neg_integer()) -> non_neg_integer() | no_return(). +to_microseconds(Timestamp) -> + if + Timestamp < 100000000000 -> + %% seconds + Timestamp * 1000000; + Timestamp < 100000000000000 -> + %% milliseconds + Timestamp * 1000; + Timestamp < 100000000000000000 -> + %% microseconds + Timestamp; + true -> + error({unsupported_time_unit, Timestamp}) + end. + +-spec to_seconds(non_neg_integer()) -> non_neg_integer() | no_return(). +to_seconds(Timestamp) -> + if + Timestamp < 100000000000 -> + %% seconds + Timestamp; + Timestamp < 100000000000000 -> + %% milliseconds + Timestamp div 1000; + Timestamp < 100000000000000000 -> + %% microseconds + Timestamp div 1000000; + true -> + error({unsupported_time_unit, Timestamp}) + end. diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 08d568b..80abd79 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -23,7 +23,13 @@ -record(prg_worker_state, {ns_id, ns_opts, process, sidecar_pid}). -define(DEFAULT_RANGE, #{direction => forward}). --define(CAPTURE_DEFENSE_INTERVAL_MS, 100). +%% 1 second +-define(MIN_SCHEDULE_STEP_US, 1000000). +%% 10 millisecond +-define(SCHEDULE_DEFENSE_INTERVAL_US, 10000). +-define(SCHEDULE_DEFENSE_INTERVAL_MS, ?SCHEDULE_DEFENSE_INTERVAL_US div 1000). +%% Used to prevent timing errors caused by scheduler overhead +-define(EFFECTIVE_SCHEDULE_STEP_US, ?MIN_SCHEDULE_STEP_US - ?SCHEDULE_DEFENSE_INTERVAL_US). %%% %%% API @@ -227,23 +233,19 @@ handle_result_error(Result, {TaskType, _} = TaskHeader, Task, Deadline, State) w error_and_stop(Result, TaskHeader, Task, Deadline, State). success_and_continue(Intent, TaskHeader, Task, Deadline, State) -> - #{action := #{set_timer := Timestamp} = Action, events := Events} = Intent, - #{task_id := TaskId, context := Context} = Task, + #{action := #{set_timer := Timestamp0} = Action, events := Events} = Intent, + #{context := Context} = Task, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId, status := OldStatus} = Process, sidecar_pid = Pid } = State, - Now = erlang:system_time(second), + Timestamp = prg_utils:to_microseconds(Timestamp0), + Now = erlang:system_time(microsecond), {#{status := NewStatus} = ProcessUpdated, Updates} = update_process(Process, Intent), - Response = response(maps:get(response, Intent, undefined)), - TaskResult = #{ - task_id => TaskId, - response => term_to_binary(Response), - finished_time => Now, - status => <<"finished">> - }, + Response = response(Intent), + TaskResult = task_result(Task, <<"finished">>, Response), NewTask = #{ process_id => ProcessId, task_type => action_to_task_type(Action), @@ -270,7 +272,9 @@ success_and_continue(Intent, TaskHeader, Task, Deadline, State) -> _ = maybe_reply(TaskHeader, Response), case SaveResult of {ok, [#{status := <<"waiting">>, task_id := NextTaskId, scheduled_time := Ts} | _]} -> - RunAfterMs = (Ts - Now) * 1000 - ?CAPTURE_DEFENSE_INTERVAL_MS, + %% if status=waiting then expression (Ts - Now) div 1000 + %% is guaranteed to return >= 1000 because see create_status/1 + RunAfterMs = (Ts - Now) div 1000 - ?SCHEDULE_DEFENSE_INTERVAL_MS, ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs), ok = next_task(self()), State#prg_worker_state{process = undefined}; @@ -289,7 +293,7 @@ success_and_remove(Intent, TaskHeader, _Task, Deadline, State) -> process = #{process_id := ProcessId} = _Process, sidecar_pid = Pid } = State, - Response = response(maps:get(response, Intent, undefined)), + Response = response(Intent), ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), _ = maybe_reply(TaskHeader, Response), @@ -298,7 +302,6 @@ success_and_remove(Intent, TaskHeader, _Task, Deadline, State) -> success_and_suspend(Intent, TaskHeader, Task, Deadline, State) -> #{events := Events, action := unset_timer} = Intent, - #{task_id := TaskId} = Task, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, @@ -310,13 +313,8 @@ success_and_suspend(Intent, TaskHeader, Task, Deadline, State) -> Pid, Deadline, NsOpts, lifecycle_event(TaskHeader, OldStatus, NewStatus), ProcessId ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), - Response = response(maps:get(response, Intent, undefined)), - TaskResult = #{ - task_id => TaskId, - response => term_to_binary(Response), - finished_time => erlang:system_time(second), - status => <<"finished">> - }, + Response = response(Intent), + TaskResult = task_result(Task, <<"finished">>, Response), SaveResult = prg_worker_sidecar:complete_and_suspend( Pid, Deadline, @@ -348,26 +346,20 @@ success_and_unlock( ) -> %% machinegun legacy behaviour #{events := Events} = Intent, - #{task_id := TaskId} = Task, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = Process, sidecar_pid = Pid } = State, - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), ok = prg_worker_sidecar:lifecycle_sink( Pid, Deadline, NsOpts, repair, ProcessId ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), {ProcessUpdated, Updates} = update_process(Process, Intent), - Response = response(maps:get(response, Intent, undefined)), - TaskResult = #{ - task_id => TaskId, - response => term_to_binary(Response), - finished_time => erlang:system_time(second), - status => <<"finished">> - }, + Response = response(Intent), + TaskResult = task_result(Task, <<"finished">>, Response), {ok, ErrorTask} = prg_worker_sidecar:get_task(Pid, Deadline, StorageOpts, NsId, ErrorTaskId), case ErrorTask of #{task_type := Type} when Type =:= <<"timeout">>; Type =:= <<"remove">> -> @@ -413,26 +405,20 @@ success_and_unlock( end; success_and_unlock(Intent, TaskHeader, Task, Deadline, State) -> #{events := Events} = Intent, - #{task_id := TaskId} = Task, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId, status := OldStatus} = Process, sidecar_pid = Pid } = State, - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), {#{status := NewStatus} = ProcessUpdated, Updates} = update_process(Process, Intent), ok = prg_worker_sidecar:lifecycle_sink( Pid, Deadline, NsOpts, lifecycle_event(TaskHeader, OldStatus, NewStatus), ProcessId ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), - Response = response(maps:get(response, Intent, undefined)), - TaskResult = #{ - task_id => TaskId, - response => term_to_binary(Response), - finished_time => erlang:system_time(second), - status => <<"finished">> - }, + Response = response(Intent), + TaskResult = task_result(Task, <<"finished">>, Response), SaveResult = prg_worker_sidecar:complete_and_unlock( Pid, Deadline, @@ -448,10 +434,16 @@ success_and_unlock(Intent, TaskHeader, Task, Deadline, State) -> ok = next_task(self()), State#prg_worker_state{process = undefined}; {ok, [#{status := <<"waiting">>, task_id := NextTaskId, scheduled_time := Ts} | _]} -> - RunAfterMs = (Ts - Now) * 1000 - ?CAPTURE_DEFENSE_INTERVAL_MS, - ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs), - ok = next_task(self()), - State#prg_worker_state{process = undefined}; + case (Ts - Now) div 1000 of + Timeout when Timeout =< ?SCHEDULE_DEFENSE_INTERVAL_MS -> + process_scheduled_task(self(), ProcessId, NextTaskId), + State#prg_worker_state{process = undefined}; + Timeout when Timeout > ?SCHEDULE_DEFENSE_INTERVAL_MS -> + RunAfterMs = Timeout - ?SCHEDULE_DEFENSE_INTERVAL_MS, + ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs), + ok = next_task(self()), + State#prg_worker_state{process = undefined} + end; {ok, [#{status := <<"running">>} = ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), @@ -462,7 +454,6 @@ success_and_unlock(Intent, TaskHeader, Task, Deadline, State) -> error_and_stop({error, Reason} = Response, TaskHeader, Task, Deadline, State) -> {TaskType, _} = TaskHeader, - #{task_id := TaskId} = Task, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, @@ -480,12 +471,7 @@ error_and_stop({error, Reason} = Response, TaskHeader, Task, Deadline, State) -> ), update_process(Process, {error, {Detail, undefined}}) end, - TaskResult = #{ - task_id => TaskId, - response => term_to_binary(Response), - finished_time => erlang:system_time(second), - status => <<"error">> - }, + TaskResult = task_result(Task, <<"error">>, Response), ok = prg_worker_sidecar:complete_and_error( Pid, Deadline, StorageOpts, NsId, TaskResult, Updates ), @@ -504,7 +490,7 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) - TaskResult = #{ task_id => TaskId, response => term_to_binary(Response), - finished_time => erlang:system_time(second), + finished_time => erlang:system_time(microsecond), status => <<"error">> }, _ = @@ -538,8 +524,11 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) - [], NewTask ), - Now = erlang:system_time(second), - RunAfterMs = (Ts - Now) * 1000 - ?CAPTURE_DEFENSE_INTERVAL_MS, + Now = erlang:system_time(microsecond), + %% The retry policy only supports a second time scale + %% this ensures that the result of the expression (Ts - Now) div 1000 + %% will be greater or approximately equal 1000 + RunAfterMs = (Ts - Now) div 1000 - ?SCHEDULE_DEFENSE_INTERVAL_MS, ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs) end, ok = next_task(self()), @@ -553,7 +542,7 @@ update_process(#{status := Status, process_id := ProcessId} = Process, {error, { Status =:= <<"init">> -> %% process broken (transition from running/init to error) - StatusChangedAt = erlang:system_time(second), + StatusChangedAt = erlang:system_time(microsecond), ProcessUpdates = #{ process_id => ProcessId, status => <<"error">>, @@ -570,7 +559,7 @@ update_process(#{status := Status, process_id := ProcessId} = Process, {error, { end; update_process(#{status := <<"error">>, process_id := ProcessId} = Process, Intent) -> %% process repaired (transition from error to running) - StatusChangedAt = erlang:system_time(second), + StatusChangedAt = erlang:system_time(microsecond), NewProcess = maps:without( [detail, corrupted_by], Process#{status => <<"running">>, previous_status := <<"error">>, status_changed_at => StatusChangedAt} @@ -586,7 +575,7 @@ update_process(#{status := <<"error">>, process_id := ProcessId} = Process, Inte update_process_from_intent(NewProcess, ProcessUpdates, Intent); update_process(#{status := <<"init">>, process_id := ProcessId} = Process, Intent) -> %% transition from init to running - StatusChangedAt = erlang:system_time(second), + StatusChangedAt = erlang:system_time(microsecond), ProcessUpdates = #{ process_id => ProcessId, status => <<"running">>, @@ -626,26 +615,35 @@ update_process_from_intent(Process, ProcessUpdates, Intent) -> Intent ). +task_result(#{task_id := TaskId, running_time := RunningTime}, Status, Response) -> + #{ + task_id => TaskId, + response => term_to_binary(Response), + running_time => RunningTime, + finished_time => erlang:system_time(microsecond), + status => Status + }. + -spec maybe_reply(task_header(), term()) -> term(). maybe_reply({_, undefined}, _) -> undefined; maybe_reply({_, {Receiver, Ref}}, Response) -> progressor:reply(Receiver, {Ref, Response}). -response({error, _} = Error) -> +response(#{response := {error, _} = Error}) -> Error; -response(undefined) -> - {ok, ok}; -response(Data) -> - {ok, Data}. +response(#{response := Data}) -> + {ok, Data}; +response(Intent) when not is_map_key(response, Intent) -> + {ok, ok}. extract_task_type({TaskType, _}) -> TaskType. check_retryable(TaskHeader, #{last_retry_interval := LastInterval} = Task, RetryPolicy, Error) -> - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), ProcessId = maps:get(process_id, Task), - Timeout = + TimeoutSec = case LastInterval =:= 0 of true -> maps:get(initial_timeout, RetryPolicy); false -> trunc(LastInterval * maps:get(backoff_coefficient, RetryPolicy)) @@ -654,7 +652,7 @@ check_retryable(TaskHeader, #{last_retry_interval := LastInterval} = Task, Retry logger:info("check retryable ~p for error: ~p, last retry interval: ~p sec, attempt: ~p", [ ProcessId, Error, LastInterval, Attempts ]), - case is_retryable(Error, TaskHeader, RetryPolicy, Timeout, Attempts) of + case is_retryable(Error, TaskHeader, RetryPolicy, TimeoutSec, Attempts) of true -> maps:with( [ @@ -670,8 +668,8 @@ check_retryable(TaskHeader, #{last_retry_interval := LastInterval} = Task, Retry ], Task#{ status => <<"waiting">>, - scheduled_time => Now + Timeout, - last_retry_interval => Timeout, + scheduled_time => Now + (TimeoutSec * 1000000), + last_retry_interval => TimeoutSec, attempts_count => Attempts } ); @@ -700,10 +698,21 @@ is_retryable(Error, {timeout, undefined}, RetryPolicy, Timeout, Attempts) -> is_retryable(_Error, _TaskHeader, _RetryPolicy, _Timeout, _Attempts) -> false. +%% Due to the difference in the time scales used for storage (microseconds) +%% and the schedule time (seconds), the following logic is required: +%% - If the difference between the schedule and the current time is less than a ~1 second +%% the task is assigned the status "running" and is processed immediately +%% - If the difference between the schedule and the current time exceeds ~1 second +%% the task is assigned the status "waiting" and is saved to the schedule create_status(Timestamp, Now) when Timestamp =< Now -> <<"running">>; -create_status(_Timestamp, _Now) -> - <<"waiting">>. +create_status(Timestamp, Now) -> + case (Timestamp - Now) >= ?EFFECTIVE_SCHEDULE_STEP_US of + true -> + <<"waiting">>; + false -> + <<"running">> + end. create_header(#{task_type := <<"timeout">>}) -> {timeout, undefined}; diff --git a/src/progressor.erl b/src/progressor.erl index 91c8187..3a4d9ad 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -329,15 +329,69 @@ do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := Hi do_get(Req) -> do_get(Req#{range => #{}}). +-define(EVENTS_KEYS, [event_id, event_timestamp, event_metadata, event_payload]). + do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> - prg_storage:process_trace(StorageOpts, NsId, Id). -do_put(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := #{process := Process} = Args} = Opts) -> + case prg_storage:process_trace(StorageOpts, NsId, Id) of + {ok, RawTrace} -> + TraceMap = lists:foldl( + fun update_trace/2, + #{}, + RawTrace + ), + Trace = lists:map(fun({_Pos, Unit}) -> Unit end, lists:sort(maps:values(TraceMap))), + {ok, Trace}; + Error -> + Error + end. + +update_trace(#{task_id := TaskId, event_id := _EventId} = RawTraceUnit, Acc) -> + maps:update_with( + TaskId, + fun(V) -> update_trace_unit(RawTraceUnit, V) end, + { + maps:size(Acc) + 1, + (maps:without(?EVENTS_KEYS, RawTraceUnit))#{ + events => [maps:with(?EVENTS_KEYS, RawTraceUnit)] + } + }, + Acc + ); +update_trace(#{task_id := TaskId} = _RawTraceUnit, Acc) when is_map_key(TaskId, Acc) -> + Acc; +update_trace(#{task_id := TaskId} = RawTraceUnit, Acc) -> + Acc#{ + TaskId => {maps:size(Acc) + 1, (maps:without(?EVENTS_KEYS, RawTraceUnit))#{events => []}} + }. + +update_trace_unit(RawTraceUnit, {Pos, TraceUnit}) -> + TraceUnitUpdated = maps:update_with( + events, + fun(ListEvents) -> + [ + maps:with(?EVENTS_KEYS, RawTraceUnit) + | ListEvents + ] + end, + [maps:with(?EVENTS_KEYS, RawTraceUnit)], + TraceUnit + ), + {Pos, TraceUnitUpdated}. + +do_put( + #{ + ns_opts := #{storage := StorageOpts}, + id := Id, + ns := NsId, + args := #{process := Process} = Args + } = Opts +) -> #{ process_id := ProcessId } = Process, Action = maps:get(action, Args, undefined), Context = maps:get(context, Opts, <<>>), - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), InitTask = #{ process_id => ProcessId, task_type => <<"init">>, @@ -427,7 +481,7 @@ make_task(#{task_type := TaskType} = TaskData) when TaskType =:= <<"call">>; TaskType =:= <<"repair">> -> - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), Defaults = #{ status => <<"running">>, scheduled_time => Now, @@ -437,7 +491,7 @@ make_task(#{task_type := TaskType} = TaskData) when }, maps:merge(Defaults, TaskData); make_task(#{task_type := <<"timeout">>} = TaskData) -> - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), Defaults = #{ %% TODO metadata => #{<<"kind">> => <<"simple_repair">>}, @@ -448,7 +502,7 @@ make_task(#{task_type := <<"timeout">>} = TaskData) -> }, maps:merge(Defaults, TaskData); make_task(#{task_type := <<"notify">>} = TaskData) -> - Now = erlang:system_time(second), + Now = erlang:system_time(microsecond), Defaults = #{ status => <<"running">>, scheduled_time => Now, @@ -491,7 +545,7 @@ action_to_task(#{set_timer := Timestamp} = Action, ProcessId, Context) -> status => <<"waiting">>, args => <<>>, context => Context, - scheduled_time => Timestamp, + scheduled_time => prg_utils:to_microseconds(Timestamp), last_retry_interval => 0, attempts_count => 0 }. diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index a70f216..4a68571 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -213,7 +213,7 @@ process_trace(PgOpts, NsId, ProcessId) -> "LEFT JOIN " ++ EventsTable ++ " ne " "ON nt.task_id = ne.task_id AND nt.process_id = ne.process_id " - "WHERE nt.process_id = $1 ORDER BY nt.task_id, ne.event_id", + "WHERE nt.process_id = $1 ORDER BY nt.running_time, nt.finished_time, nt.task_id", [ProcessId] ), case Result of @@ -261,9 +261,9 @@ collect_zombies(PgOpts, NsId, Timeout) -> schedule := ScheduleTable, running := RunningTable } = prg_pg_utils:tables(NsId), - NowSec = erlang:system_time(second), - Now = unixtime_to_datetime(NowSec), - TsBackward = unixtime_to_datetime(NowSec - (Timeout + ?PROTECT_TIMEOUT)), + NowMicroSec = erlang:system_time(microsecond), + Now = unixtime_to_datetime(NowMicroSec), + TsBackward = unixtime_to_datetime(NowMicroSec - ((Timeout + ?PROTECT_TIMEOUT) * 1000000)), {ok, _, _} = epg_pool:transaction( Pool, fun(Connection) -> @@ -300,9 +300,9 @@ search_timers(PgOpts, NsId, _Timeout, Limit) -> schedule := ScheduleTable, running := RunningTable } = prg_pg_utils:tables(NsId), - NowSec = erlang:system_time(second), - Now = unixtime_to_datetime(NowSec), - NowText = unixtime_to_text(NowSec), + NowMicroSec = erlang:system_time(microsecond), + Now = unixtime_to_datetime(NowMicroSec), + NowText = unixtime_to_text(NowMicroSec), {ok, _, Columns, Rows} = _Res = epg_pool:transaction( Pool, @@ -320,7 +320,7 @@ search_timers(PgOpts, NsId, _Timeout, Limit) -> " ORDER BY scheduled_time ASC LIMIT $3)" " RETURNING" " task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS.US') as running_time, args, metadata, " " last_retry_interval, attempts_count, context" " ) " "INSERT INTO " ++ RunningTable ++ @@ -340,8 +340,8 @@ capture_task(PgOpts, NsId, TaskId) -> schedule := ScheduleTable, running := RunningTable } = prg_pg_utils:tables(NsId), - NowSec = erlang:system_time(second), - NowText = unixtime_to_text(NowSec), + NowMicroSec = erlang:system_time(microsecond), + NowText = unixtime_to_text(NowMicroSec), {ok, Columns, Rows} = _Res = epg_pool:transaction( Pool, @@ -355,7 +355,7 @@ capture_task(PgOpts, NsId, TaskId) -> " WHERE task_id = $2 AND status = 'waiting' " " RETURNING" " task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($1, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " TO_TIMESTAMP($1, 'YYYY-MM-DD HH24:MI:SS.US') as running_time, args, metadata, " " last_retry_interval, attempts_count, context" "), " "inserted_to_running AS (" @@ -378,8 +378,8 @@ search_calls(PgOpts, NsId, Limit) -> schedule := ScheduleTable, running := RunningTable } = prg_pg_utils:tables(NsId), - NowSec = erlang:system_time(second), - Now = unixtime_to_text(NowSec), + NowMicroSec = erlang:system_time(microsecond), + Now = unixtime_to_text(NowMicroSec), {ok, _, Columns, Rows} = epg_pool:transaction( Pool, fun(Connection) -> @@ -395,7 +395,7 @@ search_calls(PgOpts, NsId, Limit) -> " GROUP BY process_id ORDER BY min ASC LIMIT $2" " ) " " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($1, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " TO_TIMESTAMP($1, 'YYYY-MM-DD HH24:MI:SS.US') as running_time, args, metadata, " " last_retry_interval, attempts_count, context" " ) " "INSERT INTO " ++ RunningTable ++ @@ -556,7 +556,7 @@ complete_and_continue(PgOpts, NsId, TaskResult, ProcessUpdates, Events, NextTask do_save_running( Connection, RunningTable, - NextTask#{task_id => NextTaskId, running_time => erlang:system_time(second)}, + NextTask#{task_id => NextTaskId, running_time => erlang:system_time(microsecond)}, " * " ); #{status := <<"waiting">>} -> @@ -765,7 +765,7 @@ do_save_process(Connection, Table, Process) -> Detail = maps:get(detail, Process, null), AuxState = maps:get(aux_state, Process, null), Meta = maps:get(metadata, Process, null), - CreatedAtTs = maps:get(created_at, Process, erlang:system_time(second)), + CreatedAtTs = maps:get(created_at, Process, erlang:system_time(microsecond)), CreatedAt = unixtime_to_datetime(CreatedAtTs), PreviousStatus = maps:get(previous_status, Process, Status), StatusChangedAt = unixtime_to_datetime(maps:get(status_changed_at, Process, CreatedAtTs)), @@ -846,7 +846,7 @@ do_save_running(Connection, Table, Task, Returning) -> } = Task, Args = maps:get(args, Task, null), MetaData = maps:get(metadata, Task, null), - RunningTs = erlang:system_time(second), + RunningTs = erlang:system_time(microsecond), Context = maps:get(context, Task, <<>>), epg_pool:query( Connection, @@ -1001,15 +1001,18 @@ do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult) status := Status } = TaskResult, Response = maps:get(response, TaskResult, null), - FinishedTime = maps:get(finished_time, TaskResult, erlang:system_time(second)), + RunningTime1 = maps:get(running_time, TaskResult, null), + FinishedTime = maps:get(finished_time, TaskResult, erlang:system_time(microsecond)), {ok, _} = epg_pool:query( Connection, "WITH deleted AS(" " DELETE FROM " ++ RunningTable ++ - " WHERE process_id = $4" + " WHERE process_id = $1" " )" - "UPDATE " ++ TaskTable ++ " SET status = $1, response = $2, finished_time = $3 WHERE task_id = $5", - [Status, Response, unixtime_to_datetime(FinishedTime), ProcessId, TaskId] + "UPDATE " ++ TaskTable ++ + " SET status = $2, response = $3, " + " running_time = $4, finished_time = $5 WHERE task_id = $6", + [ProcessId, Status, Response, unixtime_to_datetime(RunningTime1), unixtime_to_datetime(FinishedTime), TaskId] ), case Status of <<"error">> -> @@ -1017,7 +1020,7 @@ do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult) {ok, 0, [], []}; _ -> %% search waiting call - RunningTime = unixtime_to_text(erlang:system_time(second)), + RunningTime = unixtime_to_text(erlang:system_time(microsecond)), epg_pool:query( Connection, "WITH postponed_tasks AS (" @@ -1026,7 +1029,7 @@ do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult) " (SELECT min(task_id) FROM " ++ ScheduleTable ++ " WHERE process_id = $1 AND status = 'waiting' AND task_type IN ('call', 'repair')) " " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS.US') as running_time, args, metadata, " " last_retry_interval, attempts_count, context" " ) " "INSERT INTO " ++ RunningTable ++ @@ -1051,9 +1054,9 @@ do_block_timer(Connection, ScheduleTable, ProcessId) -> end. do_unlock_timer(Connection, ScheduleTable, RunningTable, ProcessId) -> - NowSec = erlang:system_time(second), - Now = unixtime_to_datetime(NowSec), - NowText = unixtime_to_text(NowSec), + NowMicroSec = erlang:system_time(microsecond), + Now = unixtime_to_datetime(NowMicroSec), + NowText = unixtime_to_text(NowMicroSec), epg_pool:query( Connection, "WITH unblocked_task AS (UPDATE" ++ ScheduleTable ++ @@ -1065,7 +1068,7 @@ do_unlock_timer(Connection, ScheduleTable, RunningTable, ProcessId) -> " WHERE process_id = $1 AND status = 'blocked' AND scheduled_time <= $2" " RETURNING" " task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($3, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " TO_TIMESTAMP($3, 'YYYY-MM-DD HH24:MI:SS.US') as running_time, args, metadata, " " last_retry_interval, attempts_count, context" "), " "running_task AS (" @@ -1141,20 +1144,20 @@ convert(json, Value) -> convert(_Type, Value) -> Value. -daytime_to_unixtime({Date, {Hour, Minute, Second}}) when is_float(Second) -> - daytime_to_unixtime({Date, {Hour, Minute, trunc(Second)}}); -daytime_to_unixtime(Daytime) -> - to_unixtime(calendar:datetime_to_gregorian_seconds(Daytime)). - -to_unixtime(Time) when is_integer(Time) -> - Time - ?EPOCH_DIFF. +daytime_to_unixtime({Date, {Hour, Minute, SecondWithMicro}}) -> + MicroPart = trunc(SecondWithMicro * 1000000) rem 1000000, + GregorianSeconds = calendar:datetime_to_gregorian_seconds({Date, {Hour, Minute, trunc(SecondWithMicro)}}), + (GregorianSeconds - ?EPOCH_DIFF) * 1000000 + MicroPart. unixtime_to_datetime(null) -> null; -unixtime_to_datetime(TimestampSec) -> - calendar:gregorian_seconds_to_datetime(TimestampSec + ?EPOCH_DIFF). +unixtime_to_datetime(Timestamp) -> + {TimestampSec, Fractional} = parse_timestamp(Timestamp), + {Date, {Hour, Minute, Second}} = calendar:gregorian_seconds_to_datetime(TimestampSec + ?EPOCH_DIFF), + {Date, {Hour, Minute, Second + Fractional}}. -unixtime_to_text(TimestampSec) -> +unixtime_to_text(Timestamp) -> + {TimestampSec, MicroSec} = parse_timestamp(Timestamp, integer), { {Year, Month, Day}, {Hour, Minute, Seconds} @@ -1167,10 +1170,12 @@ unixtime_to_text(TimestampSec) -> (maybe_add_zero(Day))/binary, " ", (maybe_add_zero(Hour))/binary, - "-", + ":", (maybe_add_zero(Minute))/binary, - "-", - (maybe_add_zero(Seconds))/binary + ":", + (maybe_add_zero(Seconds))/binary, + ".", + (microsecond_part(MicroSec))/binary >>. maybe_add_zero(Val) when Val < 10 -> @@ -1178,6 +1183,56 @@ maybe_add_zero(Val) when Val < 10 -> maybe_add_zero(Val) -> integer_to_binary(Val). +microsecond_part(0) -> + <<"0">>; +microsecond_part(Val) when Val < 10 -> + <<"00000", (integer_to_binary(Val))/binary>>; +microsecond_part(Val) when Val < 100 -> + <<"0000", (integer_to_binary(Val))/binary>>; +microsecond_part(Val) when Val < 1000 -> + <<"000", (integer_to_binary(Val))/binary>>; +microsecond_part(Val) when Val < 10000 -> + <<"00", (integer_to_binary(Val))/binary>>; +microsecond_part(Val) when Val < 100000 -> + <<"0", (integer_to_binary(Val))/binary>>; +microsecond_part(Val) -> + <<(integer_to_binary(Val))/binary>>. + +-spec parse_timestamp(Timestamp :: integer()) -> {timestamp_sec(), float() | integer()}. +parse_timestamp(Timestamp) -> + parse_timestamp(Timestamp, fractional). + +parse_timestamp(Timestamp, Opts) -> + if + Timestamp < 100000000000 -> + %% seconds + {Timestamp, 0}; + Timestamp < 100000000000000 -> + %% milliseconds + TimestampSec = Timestamp div 1000, + {TimestampSec, remainder_timestamp(Opts, 1000, Timestamp)}; + Timestamp < 100000000000000000 -> + %% microseconds + TimestampSec = Timestamp div 1000000, + {TimestampSec, remainder_timestamp(Opts, 1000000, Timestamp)}; + true -> + error({unsupported_time_unit, Timestamp}) + end. + +remainder_timestamp(fractional, Ratio, Timestamp) -> + %% 1 millisec -> 0.001 sec + %% 1 microsec -> 0.000001 sec + (Timestamp rem Ratio) / Ratio; +remainder_timestamp(integer, Ratio, Timestamp) -> + case Ratio of + 1000000 -> + %% microseconds as is + Timestamp rem 1000000; + 1000 -> + %% milliseconds to microseconds + (Timestamp rem 1000) * 1000 + end. + json_encode(null) -> null; json_encode(MetaData) -> @@ -1234,7 +1289,8 @@ marshal_event(Event) -> (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; (<<"task_id">>, TaskId, Acc) -> Acc#{task_id => TaskId}; (<<"event_id">>, EventId, Acc) -> Acc#{event_id => EventId}; - (<<"timestamp">>, Ts, Acc) -> Acc#{timestamp => Ts}; + %% TODO: to micro after processors? + (<<"timestamp">>, Ts, Acc) -> Acc#{timestamp => prg_utils:to_seconds(Ts)}; (<<"metadata">>, MetaData, Acc) -> Acc#{metadata => MetaData}; (<<"payload">>, Payload, Acc) -> Acc#{payload => Payload}; (_, _, Acc) -> Acc @@ -1250,17 +1306,19 @@ marshal_trace(Trace) -> (<<"task_id">>, TaskId, Acc) -> Acc#{task_id => TaskId}; (<<"task_type">>, TaskType, Acc) -> Acc#{task_type => TaskType}; (<<"status">>, TaskStatus, Acc) -> Acc#{task_status => TaskStatus}; - (<<"scheduled_time">>, ScheduledTs, Acc) -> Acc#{scheduled => ScheduledTs}; - (<<"running_time">>, RunningTs, Acc) -> Acc#{running => RunningTs}; - (<<"finished_time">>, FinishedTs, Acc) -> Acc#{finished => FinishedTs}; + (<<"scheduled_time">>, ScheduledTs, Acc) -> Acc#{scheduled => prg_utils:to_microseconds(ScheduledTs)}; + (<<"running_time">>, RunningTs, Acc) -> Acc#{running => prg_utils:to_microseconds(RunningTs)}; + (<<"finished_time">>, FinishedTs, Acc) -> Acc#{finished => prg_utils:to_microseconds(FinishedTs)}; (<<"args">>, Args, Acc) -> Acc#{args => Args}; (<<"metadata">>, Meta, Acc) -> Acc#{task_metadata => Meta}; + (<<"context">>, Context, Acc) -> Acc#{context => Context}; (<<"idempotency_key">>, Key, Acc) -> Acc#{idempotency_key => Key}; (<<"response">>, Response, Acc) -> Acc#{response => binary_to_term(Response)}; (<<"last_retry_interval">>, Interval, Acc) -> Acc#{retry_interval => Interval}; (<<"attempts_count">>, Attempts, Acc) -> Acc#{retry_attempts => Attempts}; (<<"event_id">>, EventId, Acc) -> Acc#{event_id => EventId}; - (<<"event_timestamp">>, Ts, Acc) -> Acc#{event_timestamp => Ts}; + %% TODO: to micro after processors? + (<<"event_timestamp">>, Ts, Acc) -> Acc#{event_timestamp => prg_utils:to_seconds(Ts)}; (<<"event_metadata">>, Meta, Acc) -> Acc#{event_metadata => Meta}; (<<"event_payload">>, Payload, Acc) -> Acc#{event_payload => Payload}; (_, _, Acc) -> Acc diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 459a33b..5b8bea6 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -308,9 +308,9 @@ simple_call_with_range_test(C) -> -spec call_replace_timer_test(_) -> _. call_replace_timer_test(C) -> %% steps: - %% 1. init -> [event1], timer 2s + remove - %% 2. call -> [], timer 0s (new timer cancel remove) - %% 3. timeout -> [event2], undefined + %% 1. init -> [event1], timer 2s + remove + %% 2. call -> [event2, event3], timer 0s (new timer cancel remove) + %% 3. timeout -> [event4], undefined _ = mock_processor(call_replace_timer_test), Id = gen_id(), {ok, ok} = progressor:init(#{ns => ?NS(C), id => Id, args => <<"init_args">>}), @@ -322,73 +322,71 @@ call_replace_timer_test(C) -> process_id := Id, status := <<"running">>, history := [ - #{ - event_id := 1, - metadata := #{<<"format_version">> := 1}, - payload := _Pl1, - timestamp := _Ts1 - }, - #{ - event_id := 2, - metadata := #{<<"format_version">> := 1}, - payload := _Pl2, - timestamp := _Ts2 - } + #{event_id := 1}, + #{event_id := 2}, + #{event_id := 3}, + #{event_id := 4} ] }} = progressor:get(#{ns => ?NS(C), id => Id}), {ok, [ #{ - task_id := _, args := <<"init_args">>, - task_type := <<"init">>, - task_status := <<"finished">>, - task_metadata := #{<<"range">> := #{}}, - retry_interval := 0, - retry_attempts := 0, - scheduled := _, running := _, finished := _, - response := {ok, ok}, - event_id := 1, - event_timestamp := _, - event_metadata := #{<<"format_version">> := 1}, - event_payload := _ - }, - #{ + events := + [ + #{ + event_id := 1, + event_timestamp := _, + event_metadata := #{<<"format_version">> := 1}, + event_payload := _ + } + ], task_id := _, - task_type := <<"remove">>, - task_status := <<"cancelled">>, + task_type := <<"init">>, + task_status := <<"finished">>, scheduled := _, retry_interval := 0, - retry_attempts := 0 + retry_attempts := 0, + task_metadata := #{<<"range">> := #{}} }, #{ - task_id := _, args := <<"call_args">>, + running := _, + finished := _, + events := + [ + #{event_id := _}, + #{event_id := _} + ], + task_id := _, task_type := <<"call">>, task_status := <<"finished">>, + scheduled := _, retry_interval := 0, retry_attempts := 0, - task_metadata := #{<<"range">> := #{}}, - scheduled := _, - running := _, - finished := _, - response := {ok, <<"response">>} + task_metadata := #{<<"range">> := #{}} }, #{ + running := _, + finished := _, + events := + [#{event_id := 4}], task_id := _, task_type := <<"timeout">>, task_status := <<"finished">>, + scheduled := _, retry_interval := 0, - retry_attempts := 0, + retry_attempts := 0 + }, + #{ + events := [], + task_id := _, + task_type := <<"remove">>, + task_status := <<"cancelled">>, scheduled := _, - %% TODO need fix for running time!!! - finished := _, - response := {ok, ok}, - event_id := 2, - event_timestamp := _, - event_metadata := #{<<"format_version">> := 1}, - event_payload := _ + retry_interval := 0, + retry_attempts := 0 } ]} = progressor:trace(#{ns => ?NS(C), id => Id}), unmock_processor(), @@ -1137,16 +1135,16 @@ mock_processor(call_replace_timer_test = TestCase) -> %% call when process suspended (wait timeout) Result = #{ response => <<"response">>, - events => [], + events => [event(2), event(3)], action => #{set_timer => erlang:system_time(second)} }, Self ! 2, {ok, Result}; ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> %% timeout after call processing (remove action was cancelled by call action) - ?assertEqual(1, erlang:length(History)), + ?assertEqual(3, erlang:length(History)), Result = #{ - events => [event(2)] + events => [event(4)] }, Self ! 3, {ok, Result}