From 63be5d177a641219ecf0d93f60194e7f20eee5bc Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Mar 2026 17:32:48 +0300 Subject: [PATCH 1/6] worker: move continuation task from args to state --- src/prg_worker.erl | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 75b69bb..ae83cf9 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -16,11 +16,11 @@ -export([handle_continue/2]). -export([process_task/3]). --export([continuation_task/3]). +-export([continuation_task/1]). -export([next_task/1]). -export([process_scheduled_task/3]). --record(prg_worker_state, {ns_id, ns_opts, process, sidecar_pid}). +-record(prg_worker_state, {ns_id, ns_opts, process, sidecar_pid, continuation}). -define(DEFAULT_RANGE, #{direction => forward}). %% 1 second @@ -39,9 +39,9 @@ process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) -> gen_server:cast(Worker, {process_task, TaskHeader, Task, otel_ctx:get_current()}). --spec continuation_task(pid(), task_header(), task()) -> ok. -continuation_task(Worker, TaskHeader, Task) -> - gen_server:cast(Worker, {continuation_task, TaskHeader, Task, otel_ctx:get_current()}). +-spec continuation_task(pid()) -> ok. +continuation_task(Worker) -> + gen_server:cast(Worker, {continuation_task, otel_ctx:get_current()}). -spec next_task(pid()) -> ok. next_task(Worker) -> @@ -96,8 +96,8 @@ handle_cast( NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), {noreply, NewState}; handle_cast( - {continuation_task, TaskHeader, Task, OtelCtx}, - #prg_worker_state{ns_opts = #{process_step_timeout := TimeoutSec}} = State + {continuation_task, OtelCtx}, + #prg_worker_state{ns_opts = #{process_step_timeout := TimeoutSec}, continuation = {TaskHeader, Task}} = State ) -> _ = otel_ctx:attach(OtelCtx), Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, @@ -130,16 +130,20 @@ handle_cast( ok = next_task(self()), {noreply, State} end; -handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) -> +handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid} = State) -> %% kill sidecar and restart to clear memory true = erlang:unlink(CurrentPid), true = erlang:exit(CurrentPid, kill), - exit(normal). + {stop, normal, State#prg_worker_state{continuation = undefined}}. +%exit(normal). handle_info(_Info, #prg_worker_state{} = State) -> {noreply, State}. -terminate(_Reason, #prg_worker_state{} = _State) -> +terminate(_Reason, #prg_worker_state{continuation = undefined} = _State) -> + ok; +terminate(_Reason, #prg_worker_state{continuation = _Continuation} = _State) -> + %% TODO: replace task in schedule ok. code_change(_OldVsn, #prg_worker_state{} = State, _Extra) -> @@ -280,9 +284,10 @@ success_and_continue(Intent, TaskHeader, Task, Deadline, State) -> State#prg_worker_state{process = undefined}; {ok, [ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, - ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), + ok = continuation_task(self()), State#prg_worker_state{ - process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}, + continuation = {create_header(ContinuationTask), ContinuationTask} } end. @@ -331,9 +336,10 @@ success_and_suspend(Intent, TaskHeader, Task, Deadline, State) -> State#prg_worker_state{process = undefined}; {ok, [ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, - ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), + ok = continuation_task(self()), State#prg_worker_state{ - process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}, + continuation = {create_header(ContinuationTask), ContinuationTask} } end. @@ -385,9 +391,10 @@ success_and_unlock( ), _ = maybe_reply(TaskHeader, Response), NewHistory = maps:get(history, Process) ++ Events, - ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), + ok = continuation_task(self()), State#prg_worker_state{ - process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}, + continuation = {create_header(ContinuationTask), ContinuationTask} }; _ -> {ok, []} = prg_worker_sidecar:complete_and_unlock( @@ -446,9 +453,10 @@ success_and_unlock(Intent, TaskHeader, Task, Deadline, State) -> end; {ok, [#{status := <<"running">>} = ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, - ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), + ok = continuation_task(self()), State#prg_worker_state{ - process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}, + continuation = {create_header(ContinuationTask), ContinuationTask} } end. From 0780fbf4d98b04fbf1a5bde0f04017eb19917c40 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Mar 2026 19:40:30 +0300 Subject: [PATCH 2/6] epg_migrator integration --- priv/migrations/0000000000000-create.erl | 159 ++++++++++++++++++ priv/migrations/0000000000001-expand-id.erl | 35 ++++ .../0000000000002-add-previous-status.erl | 43 +++++ .../0000000000003-add-init-status.erl | 22 +++ priv/migrations/0000000000004-fix-indexes.erl | 57 +++++++ rebar.config | 1 + rebar.lock | 7 + src/progressor.app.src | 1 + src/storage/postgres/prg_pg_backend.erl | 14 +- 9 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 priv/migrations/0000000000000-create.erl create mode 100644 priv/migrations/0000000000001-expand-id.erl create mode 100644 priv/migrations/0000000000002-add-previous-status.erl create mode 100644 priv/migrations/0000000000003-add-init-status.erl create mode 100644 priv/migrations/0000000000004-fix-indexes.erl diff --git a/priv/migrations/0000000000000-create.erl b/priv/migrations/0000000000000-create.erl new file mode 100644 index 0000000..258cbef --- /dev/null +++ b/priv/migrations/0000000000000-create.erl @@ -0,0 +1,159 @@ +-module('0000000000000-create'). + +-export([perform/2]). + +-spec perform(_, _) -> _. +perform(Connection, MigrationOpts) -> + NsId = proplists:get_value(namespace, MigrationOpts), + #{ + processes := ProcessesTable, + tasks := TaskTable, + schedule := ScheduleTable, + running := RunningTable, + events := EventsTable + } = prg_pg_utils:tables(NsId), + {ok, _, [{IsProcessStatusExists}]} = epg_pool:query( + Connection, + "select exists (select 1 from pg_type where typname = 'process_status')" + ), + _ = + case IsProcessStatusExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TYPE process_status AS ENUM ('running', 'error')" + ) + end, + %% create type task_status if not exists + {ok, _, [{IsTaskStatusExists}]} = epg_pool:query( + Connection, + "select exists (select 1 from pg_type where typname = 'task_status')" + ), + _ = + case IsTaskStatusExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TYPE task_status AS ENUM " + "('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled')" + ) + end, + %% create type task_type if not exists + {ok, _, [{IsTaskTypeExists}]} = epg_pool:query( + Connection, + "select exists (select 1 from pg_type where typname = 'task_type')" + ), + _ = + case IsTaskTypeExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove')" + ) + end, + %% create processes table + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TABLE IF NOT EXISTS " ++ ProcessesTable ++ + " (" + "process_id VARCHAR(80) PRIMARY KEY, " + "status process_status NOT NULL, " + "detail TEXT, " + "aux_state BYTEA, " + "created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), " + "metadata JSONB)" + ), + %% create tasks table + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TABLE IF NOT EXISTS " ++ TaskTable ++ + " (" + "task_id BIGSERIAL PRIMARY KEY, " + "process_id VARCHAR(80) NOT NULL, " + "task_type task_type NOT NULL, " + "status task_status NOT NULL, " + "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "running_time TIMESTAMP WITH TIME ZONE, " + "finished_time TIMESTAMP WITH TIME ZONE, " + "args BYTEA, " + "metadata JSONB, " + "idempotency_key VARCHAR(80) UNIQUE, " + "response BYTEA, " + "blocked_task BIGINT REFERENCES " ++ TaskTable ++ + " (task_id), " + "last_retry_interval INTEGER NOT NULL, " + "attempts_count SMALLINT NOT NULL, " + "context BYTEA, " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id))" + ), + %% create constraint for process error cause + {ok, _, _} = epg_pool:query( + Connection, + "ALTER TABLE " ++ ProcessesTable ++ + " ADD COLUMN IF NOT EXISTS corrupted_by BIGINT REFERENCES " ++ TaskTable ++ "(task_id)" + ), + + %% create schedule table + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TABLE IF NOT EXISTS " ++ ScheduleTable ++ + " (" + "task_id BIGINT PRIMARY KEY, " + "process_id VARCHAR(80) NOT NULL, " + "task_type task_type NOT NULL, " + "status task_status NOT NULL, " + "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "args BYTEA, " + "metadata JSONB, " + "last_retry_interval INTEGER NOT NULL, " + "attempts_count SMALLINT NOT NULL, " + "context BYTEA, " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ + " (process_id), " + "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" + ), + + %% create running table + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TABLE IF NOT EXISTS " ++ RunningTable ++ + " (" + "process_id VARCHAR(80) PRIMARY KEY, " + "task_id BIGINT NOT NULL, " + "task_type task_type NOT NULL, " + "status task_status NOT NULL, " + "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "running_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "args BYTEA, " + "metadata JSONB, " + "last_retry_interval INTEGER NOT NULL, " + "attempts_count SMALLINT NOT NULL, " + "context BYTEA, " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ + " (process_id), " + "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" + ), + + %% create events table + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TABLE IF NOT EXISTS " ++ EventsTable ++ + " (" + "process_id VARCHAR(80) NOT NULL, " + "task_id BIGINT NOT NULL, " + "event_id SMALLINT NOT NULL, " + "timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(), " + "metadata JSONB, " + "payload BYTEA NOT NULL, " + "PRIMARY KEY (process_id, event_id), " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ + " (process_id), " + "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" + ), + ok. diff --git a/priv/migrations/0000000000001-expand-id.erl b/priv/migrations/0000000000001-expand-id.erl new file mode 100644 index 0000000..0dee9d5 --- /dev/null +++ b/priv/migrations/0000000000001-expand-id.erl @@ -0,0 +1,35 @@ +-module('0000000000001-expand-id'). + +-export([perform/2]). + +-spec perform(_, _) -> _. +perform(Connection, MigrationOpts) -> + NsId = proplists:get_value(namespace, MigrationOpts), + #{ + processes := ProcessesTable, + tasks := TaskTable, + schedule := ScheduleTable, + running := RunningTable, + events := EventsTable + } = prg_pg_utils:tables(NsId), + lists:foreach( + fun(T) -> + TableStr = string:replace(T, "\"", "'", all), + {ok, _, [{VarSize}]} = epg_pool:query( + Connection, + "SELECT character_maximum_length FROM information_schema.columns " + "WHERE table_name = " ++ TableStr ++ " AND column_name = 'process_id'" + ), + case VarSize < 256 of + true -> + {ok, _, _} = epg_pool:query( + Connection, + "ALTER TABLE " ++ T ++ "ALTER COLUMN process_id TYPE VARCHAR(256)" + ); + false -> + skip + end + end, + [ProcessesTable, TaskTable, ScheduleTable, RunningTable, EventsTable] + ), + ok. diff --git a/priv/migrations/0000000000002-add-previous-status.erl b/priv/migrations/0000000000002-add-previous-status.erl new file mode 100644 index 0000000..b778c14 --- /dev/null +++ b/priv/migrations/0000000000002-add-previous-status.erl @@ -0,0 +1,43 @@ +-module('0000000000002-add-previous-status'). + +-export([perform/2]). + +-spec perform(_, _) -> _. +perform(Connection, MigrationOpts) -> + NsId = proplists:get_value(namespace, MigrationOpts), + #{ + processes := ProcessesTable + } = prg_pg_utils:tables(NsId), + ProcessesTableStr = string:replace(ProcessesTable, "\"", "'", all), + {ok, _, [{IsPrevStatusExists}]} = epg_pool:query( + Connection, + "SELECT exists (SELECT 1 FROM information_schema.columns WHERE table_schema = 'public' " + " AND table_name = " ++ ProcessesTableStr ++ " AND column_name = 'previous_status')" + ), + _ = + case IsPrevStatusExists of + true -> + ok; + false -> + %% create columns + {ok, _, _} = epg_pool:query( + Connection, + "ALTER TABLE " ++ ProcessesTable ++ + " ADD COLUMN previous_status process_status, " + " ADD COLUMN status_changed_at TIMESTAMP WITH TIME ZONE" + ), + %% set values + {ok, _} = epg_pool:query( + Connection, + "UPDATE " ++ ProcessesTable ++ + " SET previous_status = status, status_changed_at = created_at" + ), + %% set NOT NULL constraint + {ok, _, _} = epg_pool:query( + Connection, + "ALTER TABLE " ++ ProcessesTable ++ + " ALTER COLUMN previous_status SET NOT NULL," + " ALTER COLUMN status_changed_at SET NOT NULL" + ) + end, + ok. diff --git a/priv/migrations/0000000000003-add-init-status.erl b/priv/migrations/0000000000003-add-init-status.erl new file mode 100644 index 0000000..4c42988 --- /dev/null +++ b/priv/migrations/0000000000003-add-init-status.erl @@ -0,0 +1,22 @@ +-module('0000000000003-add-init-status'). + +-export([perform/2]). + +-spec perform(_, _) -> _. +perform(Connection, _MigrationOpts) -> + {ok, _, [{IsInitStatusExists}]} = epg_pool:query( + Connection, + "select exists (SELECT 1 FROM pg_enum WHERE " + " enumtypid = 'process_status'::regtype and enumlabel = 'init')" + ), + _ = + case IsInitStatusExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "ALTER TYPE process_status ADD VALUE 'init'" + ) + end, + ok. diff --git a/priv/migrations/0000000000004-fix-indexes.erl b/priv/migrations/0000000000004-fix-indexes.erl new file mode 100644 index 0000000..953a08e --- /dev/null +++ b/priv/migrations/0000000000004-fix-indexes.erl @@ -0,0 +1,57 @@ +-module('0000000000004-fix-indexes'). + +-export([perform/2]). + +-spec perform(_, _) -> _. +perform(Connection, MigrationOpts) -> + NsId = proplists:get_value(namespace, MigrationOpts), + #{ + tasks := TaskTable, + schedule := ScheduleTable, + running := RunningTable, + events := EventsTable + } = prg_pg_utils:tables(NsId), + {ok, _, _} = drop_index(Connection, "process_idx"), + {ok, _, _} = drop_index(Connection, "task_idx"), + {ok, _, _} = create_index(Connection, EventsTable, "process_idx", "(process_id)"), + {ok, _, _} = create_index(Connection, TaskTable, "process_idx", "(process_id)"), + {ok, _, _} = create_index(Connection, ScheduleTable, "process_idx", "(process_id)"), + {ok, _, _} = create_index(Connection, RunningTable, "task_idx", "(task_id)"), + ok. + +drop_index(Connection, IndexName) -> + {ok, _, [{IsIndexExists}]} = epg_pool:query( + Connection, + "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = $1)", + [IndexName] + ), + case IsIndexExists of + true -> + epg_pool:query(Connection, "DROP INDEX " ++ IndexName); + false -> + {ok, [], []} + end. + +create_index(Connection, Table, Index, Fields) -> + create_index(Connection, Table, Index, " HASH ", Fields). + +create_index(Connection, Table, Index, IndexType, Fields) -> + %% unwrap table name and wrap index name + IndexName = "\"" ++ string:replace(Table, "\"", "", all) ++ "_" ++ Index ++ "\"", + %% re-wrap for using in WHERE section + IndexNameStr = string:replace(IndexName, "\"", "'", all), + {ok, _, [{IsIndexExists}]} = epg_pool:query( + Connection, + "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = " ++ IndexNameStr ++ " )" + ), + case IsIndexExists of + true -> + {ok, [], []}; + false -> + epg_pool:query( + Connection, + "CREATE INDEX " ++ IndexName ++ + " on " ++ Table ++ + " USING " ++ IndexType ++ " " ++ Fields + ) + end. diff --git a/rebar.config b/rebar.config index ed4bee4..9999520 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}}, + {epg_migrator, {git, "https://github.com/valitydev/epg_migrator.git", {branch, "fx/fix-advisory-lock"}}}, {opentelemetry_api, "1.4.0"} ]}. diff --git a/rebar.lock b/rebar.lock index 22f9a4d..1750003 100644 --- a/rebar.lock +++ b/rebar.lock @@ -9,10 +9,15 @@ {git,"https://github.com/valitydev/epg_connector.git", {ref,"939a0d4ab3f7561a79b45381bbe13029d9263006"}}, 0}, + {<<"epg_migrator">>, + {git,"https://github.com/valitydev/epg_migrator.git", + {ref,"200027d5163fbdc9ffe9cc83250fe9493fe10332"}}, + 0}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", {ref,"28e9f84c95065a51e92baeb37d2cf1687fc4b9ce"}}, 1}, + {<<"erlydtl">>,{pkg,<<"erlydtl">>,<<"0.14.0">>},1}, {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},2}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},1}, @@ -32,6 +37,7 @@ {pkg_hash,[ {<<"brod">>, <<"51F4DFF17ED43A806558EBD62CC88E7B35AED336D1BA1F3DE2D010F463D49736">>}, {<<"crc32cer">>, <<"B550DA6D615FEB72A882D15D020F8F7DEE72DFB2CB1BCDF3B1EE8DC2AFD68CFC">>}, + {<<"erlydtl">>, <<"964B2DC84F8C17ACFAA69C59BA129EF26AC45D2BA898C3C6AD9B5BDC8BA13CED">>}, {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>}, @@ -42,6 +48,7 @@ {pkg_hash_ext,[ {<<"brod">>, <<"88584FDEBA746AA6729E2A1826416C10899954F68AF93659B3C2F38A2DCAA27C">>}, {<<"crc32cer">>, <<"A39B8F0B1990AC1BF06C3A247FC6A178B740CDFC33C3B53688DC7DD6B1855942">>}, + {<<"erlydtl">>, <<"D80EC044CD8F58809C19D29AC5605BE09E955040911B644505E31E9DD8143431">>}, {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>}, diff --git a/src/progressor.app.src b/src/progressor.app.src index be762b3..25b59c8 100644 --- a/src/progressor.app.src +++ b/src/progressor.app.src @@ -9,6 +9,7 @@ jsx, prometheus, epg_connector, + epg_migrator, thrift, mg_proto, brod, diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 6ec9c86..94c7ffa 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -682,7 +682,19 @@ complete_and_unlock(PgOpts, NsId, TaskResult, ProcessUpdates, Events) -> -spec db_init(pg_opts(), namespace_id()) -> ok. db_init(PgOpts, NsId) -> - prg_pg_migration:db_init(PgOpts, NsId). + Pool = get_pool(internal, PgOpts), + {ok, Pools} = application:get_env(epg_connector, pools), + {ok, Databases} = application:get_env(epg_connector, databases), + #{database := DbRef} = maps:get(Pool, Pools), + DbOpts = maps:get(DbRef, Databases), + + PrivDir = code:priv_dir(progressor), + MigrationsDir = filename:join([PrivDir, "migrations"]), + MigrationOpts = [{namespace, NsId}], + Realm = erlang:atom_to_binary(NsId), + + {ok, _} = epg_migrator:perform(Realm, DbOpts, MigrationOpts, MigrationsDir), + ok. %-ifdef(TEST). From 0ee259764ea09084153682777817c25a5f3b4b98 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Mar 2026 22:19:40 +0300 Subject: [PATCH 3/6] add reschedule function --- src/prg_storage.erl | 5 +++++ src/prg_worker.erl | 19 +++++++++++++---- src/storage/postgres/prg_pg_backend.erl | 28 +++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/prg_storage.erl b/src/prg_storage.erl index 3712344..440e3e3 100644 --- a/src/prg_storage.erl +++ b/src/prg_storage.erl @@ -25,6 +25,7 @@ -export([complete_and_error/4]). -export([remove_process/3]). -export([capture_task/3]). +-export([reschedule_task/3]). %% shared functions -export([get_task/3]). @@ -146,6 +147,10 @@ remove_process(#{client := Handler, options := HandlerOpts}, NsId, ProcessId) -> capture_task(#{client := Handler, options := HandlerOpts}, NsId, TaskId) -> Handler:capture_task(HandlerOpts, NsId, TaskId). +-spec reschedule_task(storage_opts(), namespace_id(), task()) -> ok | no_return(). +reschedule_task(#{client := Handler, options := HandlerOpts}, NsId, Task) -> + Handler:reschedule_task(HandlerOpts, NsId, Task). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Shared functions (recipient required) %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/prg_worker.erl b/src/prg_worker.erl index ae83cf9..de358c0 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -135,15 +135,26 @@ handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid} = State) -> true = erlang:unlink(CurrentPid), true = erlang:exit(CurrentPid, kill), {stop, normal, State#prg_worker_state{continuation = undefined}}. -%exit(normal). handle_info(_Info, #prg_worker_state{} = State) -> {noreply, State}. -terminate(_Reason, #prg_worker_state{continuation = undefined} = _State) -> +terminate(_Reason, #prg_worker_state{continuation = {{TaskType, _}, Task}} = State) when + TaskType =:= timeout; + TaskType =:= remove +-> + #prg_worker_state{ + ns_id = NsId, + ns_opts = #{storage := StorageOpts} = _NsOpts + } = State, + try + prg_storage:reschedule_task(StorageOpts, NsId, Task) + catch + Class:Term:Trace -> + logger:error("reschedule task error: ~p", [[Class, Term, Trace]]) + end, ok; -terminate(_Reason, #prg_worker_state{continuation = _Continuation} = _State) -> - %% TODO: replace task in schedule +terminate(_Reason, #prg_worker_state{} = _State) -> ok. code_change(_OldVsn, #prg_worker_state{} = State, _Extra) -> diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 94c7ffa..0cbb706 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -28,6 +28,7 @@ -export([complete_and_error/4]). -export([remove_process/3]). -export([capture_task/3]). +-export([reschedule_task/3]). %% shared functions -export([get_task/4]). @@ -371,6 +372,26 @@ capture_task(PgOpts, NsId, TaskId) -> ), to_maps(Columns, Rows, fun marshal_task/1). +-spec reschedule_task(pg_opts(), namespace_id(), task()) -> ok | no_return(). +reschedule_task(PgOpts, NsId, #{task_id := TaskId} = Task) -> + Pool = get_pool(internal, PgOpts), + #{ + schedule := ScheduleTable, + running := RunningTable + } = prg_pg_utils:tables(NsId), + ScheduledTask = maps:merge( + maps:without([running_time], Task), + #{status => <<"waiting">>} + ), + epg_pool:transaction( + Pool, + fun(Connection) -> + {ok, _} = do_delete_running(Connection, RunningTable, TaskId), + {ok, _, _, _} = do_save_schedule(Connection, ScheduleTable, ScheduledTask) + end + ), + ok. + -spec search_calls(pg_opts(), namespace_id(), pos_integer()) -> [task()]. search_calls(PgOpts, NsId, Limit) -> Pool = get_pool(scan, PgOpts), @@ -883,6 +904,13 @@ do_save_running(Connection, Table, Task, Returning) -> ] ). +do_delete_running(Connection, Table, TaskId) -> + epg_pool:query( + Connection, + "DELETE FROM " ++ Table ++ " WHERE task_id = $1", + [TaskId] + ). + do_save_schedule(Connection, Table, Task) -> do_save_schedule(Connection, Table, Task, "task_id"). From 8dfd469b7e3f37eee004459450da07415c1f116e Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Mar 2026 22:25:59 +0300 Subject: [PATCH 4/6] cleanup --- src/storage/postgres/prg_pg_backend.erl | 22 +- src/storage/postgres/prg_pg_migration.erl | 338 ---------------------- 2 files changed, 21 insertions(+), 339 deletions(-) delete mode 100644 src/storage/postgres/prg_pg_migration.erl diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 0cbb706..ee3ca84 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -721,7 +721,27 @@ db_init(PgOpts, NsId) -> -spec cleanup(_, _) -> _. cleanup(PgOpts, NsId) -> - prg_pg_migration:cleanup(PgOpts, NsId). + Pool = get_pool(internal, PgOpts), + #{ + processes := ProcessesTable, + tasks := TaskTable, + schedule := ScheduleTable, + running := RunningTable, + events := EventsTable + } = prg_pg_utils:tables(NsId), + epg_pool:transaction( + Pool, + fun(Connection) -> + {ok, _, _} = epg_pool:query(Connection, "ALTER TABLE " ++ ProcessesTable ++ " DROP COLUMN corrupted_by"), + {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ EventsTable), + {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ RunningTable), + {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ ScheduleTable), + {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ TaskTable), + {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ ProcessesTable), + _ = epg_pool:query(Connection, "DROP TYPE task_type, task_status, process_status") + end + ), + ok. %-endif. diff --git a/src/storage/postgres/prg_pg_migration.erl b/src/storage/postgres/prg_pg_migration.erl deleted file mode 100644 index 821c0dd..0000000 --- a/src/storage/postgres/prg_pg_migration.erl +++ /dev/null @@ -1,338 +0,0 @@ --module(prg_pg_migration). - --include_lib("progressor/include/progressor.hrl"). - --export([db_init/2]). --export([cleanup/2]). - -%-define(TBL_PROC(NS), "\"" ++ erlang:atom_to_list(NsId) ++ "_processes" ++ "\""). -%-define(TBL_PROC_STR(NS), "'" ++ erlang:atom_to_list(NsId) ++ "_processes" ++ "'"). - --spec db_init(prg_pg_backend:pg_opts(), namespace_id()) -> ok. -db_init(#{pool := Pool}, NsId) -> - #{ - processes := ProcessesTable, - tasks := TaskTable, - schedule := ScheduleTable, - running := RunningTable, - events := EventsTable - } = prg_pg_utils:tables(NsId), - {ok, _, _} = epg_pool:transaction( - Pool, - fun(Connection) -> - %% create type process_status if not exists - {ok, _, [{IsProcessStatusExists}]} = epg_pool:query( - Connection, - "select exists (select 1 from pg_type where typname = 'process_status')" - ), - _ = - case IsProcessStatusExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TYPE process_status AS ENUM ('running', 'error')" - ) - end, - %% create type task_status if not exists - {ok, _, [{IsTaskStatusExists}]} = epg_pool:query( - Connection, - "select exists (select 1 from pg_type where typname = 'task_status')" - ), - _ = - case IsTaskStatusExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TYPE task_status AS ENUM " - "('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled')" - ) - end, - %% create type task_type if not exists - {ok, _, [{IsTaskTypeExists}]} = epg_pool:query( - Connection, - "select exists (select 1 from pg_type where typname = 'task_type')" - ), - _ = - case IsTaskTypeExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove')" - ) - end, - %% create processes table - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TABLE IF NOT EXISTS " ++ ProcessesTable ++ - " (" - "process_id VARCHAR(80) PRIMARY KEY, " - "status process_status NOT NULL, " - "detail TEXT, " - "aux_state BYTEA, " - "created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), " - "metadata JSONB)" - ), - %% create tasks table - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TABLE IF NOT EXISTS " ++ TaskTable ++ - " (" - "task_id BIGSERIAL PRIMARY KEY, " - "process_id VARCHAR(80) NOT NULL, " - "task_type task_type NOT NULL, " - "status task_status NOT NULL, " - "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "running_time TIMESTAMP WITH TIME ZONE, " - "finished_time TIMESTAMP WITH TIME ZONE, " - "args BYTEA, " - "metadata JSONB, " - "idempotency_key VARCHAR(80) UNIQUE, " - "response BYTEA, " - "blocked_task BIGINT REFERENCES " ++ TaskTable ++ - " (task_id), " - "last_retry_interval INTEGER NOT NULL, " - "attempts_count SMALLINT NOT NULL, " - "context BYTEA, " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id))" - ), - %% create constraint for process error cause - {ok, _, _} = epg_pool:query( - Connection, - "ALTER TABLE " ++ ProcessesTable ++ - " ADD COLUMN IF NOT EXISTS corrupted_by BIGINT REFERENCES " ++ TaskTable ++ "(task_id)" - ), - - %% create schedule table - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TABLE IF NOT EXISTS " ++ ScheduleTable ++ - " (" - "task_id BIGINT PRIMARY KEY, " - "process_id VARCHAR(80) NOT NULL, " - "task_type task_type NOT NULL, " - "status task_status NOT NULL, " - "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "args BYTEA, " - "metadata JSONB, " - "last_retry_interval INTEGER NOT NULL, " - "attempts_count SMALLINT NOT NULL, " - "context BYTEA, " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ - " (process_id), " - "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" - ), - - %% create running table - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TABLE IF NOT EXISTS " ++ RunningTable ++ - " (" - "process_id VARCHAR(80) PRIMARY KEY, " - "task_id BIGINT NOT NULL, " - "task_type task_type NOT NULL, " - "status task_status NOT NULL, " - "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "running_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "args BYTEA, " - "metadata JSONB, " - "last_retry_interval INTEGER NOT NULL, " - "attempts_count SMALLINT NOT NULL, " - "context BYTEA, " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ - " (process_id), " - "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" - ), - - %% create events table - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TABLE IF NOT EXISTS " ++ EventsTable ++ - " (" - "process_id VARCHAR(80) NOT NULL, " - "task_id BIGINT NOT NULL, " - "event_id SMALLINT NOT NULL, " - "timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(), " - "metadata JSONB, " - "payload BYTEA NOT NULL, " - "PRIMARY KEY (process_id, event_id), " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ - " (process_id), " - "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" - ), - %% DELETED VIA MIGRATION 4 - %% create indexes - %{ok, _, _} = epg_pool:query( - % Connection, - % "CREATE INDEX IF NOT EXISTS process_idx on " ++ EventsTable ++ " USING HASH (process_id)" - %), - %{ok, _, _} = epg_pool:query( - % Connection, - % "CREATE INDEX IF NOT EXISTS process_idx on " ++ TaskTable ++ " USING HASH (process_id)" - %), - %{ok, _, _} = epg_pool:query( - % Connection, - %% "CREATE INDEX IF NOT EXISTS process_idx on " ++ ScheduleTable ++ " USING HASH (process_id)" - %), - %{ok, _, _} = epg_pool:query( - % Connection, - % "CREATE INDEX IF NOT EXISTS task_idx on " ++ RunningTable ++ " USING HASH (task_id)" - %), - - %% MIGRATIONS - %% MIGRATION 1 - %% migrate process_id to varchar 256 - ok = lists:foreach( - fun(T) -> - TableStr = string:replace(T, "\"", "'", all), - {ok, _, [{VarSize}]} = epg_pool:query( - Connection, - "SELECT character_maximum_length FROM information_schema.columns " - "WHERE table_name = " ++ TableStr ++ " AND column_name = 'process_id'" - ), - case VarSize < 256 of - true -> - {ok, _, _} = epg_pool:query( - Connection, - "ALTER TABLE " ++ T ++ "ALTER COLUMN process_id TYPE VARCHAR(256)" - ); - false -> - skip - end - end, - [ProcessesTable, TaskTable, ScheduleTable, RunningTable, EventsTable] - ), - %% MIGRATION 2 - %% add previous_status, status_changed_at to processes table and set values - ProcessesTableStr = string:replace(ProcessesTable, "\"", "'", all), - {ok, _, [{IsPrevStatusExists}]} = epg_pool:query( - Connection, - "SELECT exists (SELECT 1 FROM information_schema.columns WHERE table_schema = 'public' " - " AND table_name = " ++ ProcessesTableStr ++ " AND column_name = 'previous_status')" - ), - _ = - case IsPrevStatusExists of - true -> - ok; - false -> - %% create columns - {ok, _, _} = epg_pool:query( - Connection, - "ALTER TABLE " ++ ProcessesTable ++ - " ADD COLUMN previous_status process_status, " - " ADD COLUMN status_changed_at TIMESTAMP WITH TIME ZONE" - ), - %% set values - {ok, _} = epg_pool:query( - Connection, - "UPDATE " ++ ProcessesTable ++ - " SET previous_status = status, status_changed_at = created_at" - ), - %% set NOT NULL constraint - {ok, _, _} = epg_pool:query( - Connection, - "ALTER TABLE " ++ ProcessesTable ++ - " ALTER COLUMN previous_status SET NOT NULL," - " ALTER COLUMN status_changed_at SET NOT NULL" - ) - end, - - %% MIGRATION 3 - %% Expand prosess_status enumeration - {ok, _, [{IsInitStatusExists}]} = epg_pool:query( - Connection, - "select exists (SELECT 1 FROM pg_enum WHERE " - " enumtypid = 'process_status'::regtype and enumlabel = 'init')" - ), - _ = - case IsInitStatusExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "ALTER TYPE process_status ADD VALUE 'init'" - ) - end, - - %% MIGRATION 4 - %% Drop wrong indexes if exists, create new indexes - {ok, _, _} = drop_index(Connection, "process_idx"), - {ok, _, _} = drop_index(Connection, "task_idx"), - {ok, _, _} = create_index(Connection, EventsTable, "process_idx", "(process_id)"), - {ok, _, _} = create_index(Connection, TaskTable, "process_idx", "(process_id)"), - {ok, _, _} = create_index(Connection, ScheduleTable, "process_idx", "(process_id)"), - {ok, _, _} = create_index(Connection, RunningTable, "task_idx", "(task_id)"), - - %%% END - {ok, [], []} - end - ), - ok. - --spec cleanup(_, _) -> _. -cleanup(#{pool := Pool}, NsId) -> - #{ - processes := ProcessesTable, - tasks := TaskTable, - schedule := ScheduleTable, - running := RunningTable, - events := EventsTable - } = prg_pg_utils:tables(NsId), - epg_pool:transaction( - Pool, - fun(Connection) -> - {ok, _, _} = epg_pool:query(Connection, "ALTER TABLE " ++ ProcessesTable ++ " DROP COLUMN corrupted_by"), - {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ EventsTable), - {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ RunningTable), - {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ ScheduleTable), - {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ TaskTable), - {ok, _, _} = epg_pool:query(Connection, "DROP TABLE " ++ ProcessesTable), - _ = epg_pool:query(Connection, "DROP TYPE task_type, task_status, process_status") - end - ), - ok. - -%% - -drop_index(Connection, IndexName) -> - {ok, _, [{IsIndexExists}]} = epg_pool:query( - Connection, - "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = $1)", - [IndexName] - ), - case IsIndexExists of - true -> - epg_pool:query(Connection, "DROP INDEX " ++ IndexName); - false -> - {ok, [], []} - end. - -create_index(Connection, Table, Index, Fields) -> - create_index(Connection, Table, Index, " HASH ", Fields). - -create_index(Connection, Table, Index, IndexType, Fields) -> - %% unwrap table name and wrap index name - IndexName = "\"" ++ string:replace(Table, "\"", "", all) ++ "_" ++ Index ++ "\"", - %% re-wrap for using in WHERE section - IndexNameStr = string:replace(IndexName, "\"", "'", all), - {ok, _, [{IsIndexExists}]} = epg_pool:query( - Connection, - "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = " ++ IndexNameStr ++ " )" - ), - case IsIndexExists of - true -> - {ok, [], []}; - false -> - epg_pool:query( - Connection, - "CREATE INDEX " ++ IndexName ++ - " on " ++ Table ++ - " USING " ++ IndexType ++ " " ++ Fields - ) - end. From 3b6abfb2e6dd94e508646c67837f7d5993c0da42 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 18 Mar 2026 17:40:48 +0300 Subject: [PATCH 5/6] fix issues --- rebar.config | 2 +- rebar.lock | 2 +- src/prg_storage.erl | 2 -- src/prg_test_utils.erl | 24 ++++++++++++++++++++++++ src/progressor.erl | 21 --------------------- 5 files changed, 26 insertions(+), 25 deletions(-) create mode 100644 src/prg_test_utils.erl diff --git a/rebar.config b/rebar.config index 9999520..dab9cc1 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}}, - {epg_migrator, {git, "https://github.com/valitydev/epg_migrator.git", {branch, "fx/fix-advisory-lock"}}}, + {epg_migrator, {git, "https://github.com/valitydev/epg_migrator.git", {branch, "main"}}}, {opentelemetry_api, "1.4.0"} ]}. diff --git a/rebar.lock b/rebar.lock index 1750003..b239ed3 100644 --- a/rebar.lock +++ b/rebar.lock @@ -11,7 +11,7 @@ 0}, {<<"epg_migrator">>, {git,"https://github.com/valitydev/epg_migrator.git", - {ref,"200027d5163fbdc9ffe9cc83250fe9493fe10332"}}, + {ref,"8633c43fb4d022355c1498c0effa9ecf4098a67f"}}, 0}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", diff --git a/src/prg_storage.erl b/src/prg_storage.erl index 440e3e3..f1e5a9e 100644 --- a/src/prg_storage.erl +++ b/src/prg_storage.erl @@ -37,9 +37,7 @@ %% Init operations -export([db_init/2]). -%-ifdef(TEST). -export([cleanup/2]). -%-endif. %%%%%%%%%%%%%%%%%%%%%%%% %% API handler functions diff --git a/src/prg_test_utils.erl b/src/prg_test_utils.erl new file mode 100644 index 0000000..17151a9 --- /dev/null +++ b/src/prg_test_utils.erl @@ -0,0 +1,24 @@ +-module(prg_test_utils). + +-export([cleanup/1]). + +%% @doc Deletes all database records for testing purposes only. +%% +%% This function truncates/resets the database tables to a clean state. +%% It is designed exclusively for use in test setups and teardowns +%% (between test cases and suites) to ensure test isolation. +%% +%% IMPORTANT: This function is NOT intended for production use. +%% Calling it in a production environment will result in IRREVERSIBLE +%% DATA LOSS and severe application disruption. +%% +%% The function is exported to allow test frameworks (e.g., Common Test, EUnit) +%% to access it, but it should be considered a private interface for testing. +%% +%% @end +-spec cleanup(_) -> _. +cleanup(#{ns := NsId} = _Opts) -> + {ok, NSs} = application:get_env(progressor, namespaces), + NsOpts = maps:get(NsId, NSs), + #{storage := StorageOpts} = prg_utils:make_ns_opts(NsId, NsOpts), + ok = prg_storage:cleanup(StorageOpts, NsId). diff --git a/src/progressor.erl b/src/progressor.erl index 6272795..0351b10 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -20,10 +20,6 @@ %% Internal API -export([reply/2]). -%-ifdef(TEST). --export([cleanup/1]). -%-endif. - -type request() :: #{ ns := namespace_id(), id := id(), @@ -146,23 +142,6 @@ health_check_namespace(NsId) -> #{ns => NsId} ). -%-ifdef(TEST). - --spec cleanup(_) -> _. -cleanup(Opts) -> - prg_utils:pipe( - [ - fun add_ns_opts/1, - fun cleanup_storage/1 - ], - Opts - ). - -cleanup_storage(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> - ok = prg_storage:cleanup(StorageOpts, NsId). - -%-endif. - %% Internal functions add_ns_opts(#{ns := NsId} = Opts) -> From e98ca390e78ced325a7b275d2d3f523982d79dec Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 18 Mar 2026 22:31:11 +0300 Subject: [PATCH 6/6] add reschedule logs --- src/prg_worker.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/prg_worker.erl b/src/prg_worker.erl index de358c0..8939522 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -145,13 +145,19 @@ terminate(_Reason, #prg_worker_state{continuation = {{TaskType, _}, Task}} = Sta -> #prg_worker_state{ ns_id = NsId, - ns_opts = #{storage := StorageOpts} = _NsOpts + ns_opts = #{storage := StorageOpts}, + continuation = {_, #{task_id := TaskId}}, + process = #{process_id := ProcessId} } = State, - try - prg_storage:reschedule_task(StorageOpts, NsId, Task) + try prg_storage:reschedule_task(StorageOpts, NsId, Task) of + ok -> + logger:warning("process ~p reschedule task ~p when terminate", [ProcessId, TaskId]) catch Class:Term:Trace -> - logger:error("reschedule task error: ~p", [[Class, Term, Trace]]) + logger:error( + "process ~p reschedule task ~p error: ~p", + [ProcessId, TaskId, {Class, Term, Trace}] + ) end, ok; terminate(_Reason, #prg_worker_state{} = _State) ->