diff --git a/src/prg_notifier.erl b/src/prg_notifier.erl index c57c57a..542e0b3 100644 --- a/src/prg_notifier.erl +++ b/src/prg_notifier.erl @@ -132,8 +132,9 @@ serialize_content(Array) when is_list(Array) -> serialize_content(Arg) -> erlang:error(badarg, [Arg]). -serialize_timestamp(TimestampSec) -> - Str = calendar:system_time_to_rfc3339(TimestampSec, [{unit, second}, {offset, "Z"}]), +serialize_timestamp(Timestamp) -> + TsMicro = prg_utils:to_microseconds(Timestamp), + Str = calendar:system_time_to_rfc3339(TsMicro, [{unit, microsecond}, {offset, "Z"}]), erlang:list_to_binary(Str). %% lifecycle serialization diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 80abd79..75b69bb 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -487,12 +487,7 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) - process = #{process_id := ProcessId} = Process, sidecar_pid = Pid } = State, - TaskResult = #{ - task_id => TaskId, - response => term_to_binary(Response), - finished_time => erlang:system_time(microsecond), - status => <<"error">> - }, + TaskResult = task_result(Task, <<"error">>, Response), _ = case check_retryable(TaskHeader, Task, RetryPolicy, Reason) of not_retryable -> diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 5b8bea6..8d30359 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -844,7 +844,7 @@ put_process_with_timeout_test(C) -> status => <<"running">>, history => [event(1)] }, - action => #{set_timer => erlang:system_time(second) + 1} + action => #{set_timer => erlang:system_time(microsecond) + 1000000} }, {ok, ok} = progressor:put(#{ns => ?NS(C), id => Id, args => Args}), timer:sleep(?AWAIT_TIMEOUT(C)), @@ -942,7 +942,7 @@ put_process_with_remove_test(C) -> status => <<"running">>, history => [event(1)] }, - action => #{set_timer => erlang:system_time(second) + 1, remove => true} + action => #{set_timer => erlang:system_time(microsecond) + 1000000, remove => true} }, {ok, ok} = progressor:put(#{ns => ?NS(C), id => Id, args => Args}), timer:sleep(?AWAIT_TIMEOUT(C)), @@ -987,7 +987,7 @@ mock_processor(simple_timers_test = TestCase) -> events => [event(1)], metadata => #{<<"k">> => <<"v">>}, %% postponed timer - action => #{set_timer => erlang:system_time(second) + 2}, + action => #{set_timer => erlang:system_time(microsecond) + 2000000}, aux_state => erlang:term_to_binary(<<"aux_state1">>) }, Self ! 1, @@ -996,7 +996,7 @@ mock_processor(simple_timers_test = TestCase) -> Result = #{ events => [event(2)], %% continuation timer - action => #{set_timer => erlang:system_time(second)}, + action => #{set_timer => erlang:system_time(microsecond)}, aux_state => erlang:term_to_binary(<<"aux_state2">>) }, Self ! 2, @@ -1017,7 +1017,7 @@ mock_processor(simple_call_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second) + 2} + action => #{set_timer => erlang:system_time(microsecond) + 2000000} }, Self ! 1, {ok, Result}; @@ -1047,7 +1047,7 @@ mock_processor(reschedule_after_call_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second) + 2} + action => #{set_timer => erlang:system_time(microsecond) + 2000000} }, Self ! 1, {ok, Result}; @@ -1106,7 +1106,7 @@ mock_processor(simple_call_with_range_test = TestCase) -> Result = #{ response => <<"response">>, events => [event(6)], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 3, {ok, Result}; @@ -1127,7 +1127,7 @@ mock_processor(call_replace_timer_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second) + 2, remove => true} + action => #{set_timer => erlang:system_time(microsecond) + 2000000, remove => true} }, Self ! 1, {ok, Result}; @@ -1136,7 +1136,7 @@ mock_processor(call_replace_timer_test = TestCase) -> Result = #{ response => <<"response">>, events => [event(2), event(3)], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 2, {ok, Result}; @@ -1157,7 +1157,7 @@ mock_processor(call_unset_timer_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second) + 2} + action => #{set_timer => erlang:system_time(microsecond) + 2000000} }, Self ! 1, {ok, Result}; @@ -1187,7 +1187,7 @@ mock_processor(postponed_call_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 1, {ok, Result}; @@ -1195,7 +1195,7 @@ mock_processor(postponed_call_test = TestCase) -> timer:sleep(3000), Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 2, {ok, Result}; @@ -1223,7 +1223,7 @@ mock_processor(postponed_call_to_suspended_process_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 1, {ok, Result}; @@ -1271,7 +1271,7 @@ mock_processor(simple_repair_after_non_retriable_error_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 1, {ok, Result}; @@ -1282,7 +1282,7 @@ mock_processor(simple_repair_after_non_retriable_error_test = TestCase) -> %% timeout via simple repair Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 3, {ok, Result}; @@ -1302,7 +1302,7 @@ mock_processor(repair_after_non_retriable_error_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 1, {ok, Result}; @@ -1331,7 +1331,7 @@ mock_processor(error_after_max_retries_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(second)} + action => #{set_timer => erlang:system_time(microsecond)} }, Self ! 1, {ok, Result}; @@ -1391,7 +1391,7 @@ mock_processor(remove_by_timer_test = TestCase) -> MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1), event(2)], - action => #{set_timer => erlang:system_time(second) + 2, remove => true} + action => #{set_timer => erlang:system_time(microsecond) + 2000000, remove => true} }, {ok, Result} end, @@ -1403,7 +1403,7 @@ mock_processor(remove_without_timer_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(second) + 2} + action => #{set_timer => erlang:system_time(microsecond) + 2000000} }, Self ! 1, {ok, Result}; @@ -1471,7 +1471,7 @@ expect_steps_counter(ExpectedSteps, CurrentStep) -> event(Id) -> #{ event_id => Id, - timestamp => erlang:system_time(second), + timestamp => erlang:system_time(microsecond), metadata => #{<<"format_version">> => 1}, %% msg_pack compatibility for kafka payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)})