Skip to content

Commit 738757d

Browse files
committed
PubSub convenience functions
1 parent fb4fb93 commit 738757d

File tree

5 files changed

+79
-62
lines changed

5 files changed

+79
-62
lines changed

dev/user_admin.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ defmodule DemoWeb.UserAdmin do
7272
|> Ecto.Changeset.change(encrypted_password: :crypto.strong_rand_bytes(16) |> Base.encode16())
7373
|> Demo.Repo.update()
7474

75-
LiveAdmin.PubSub.broadcast(session.id, {:job, %{pid: self(), progress: i/count, label: "Regenerating passwords"}})
75+
LiveAdmin.PubSub.update_job(session.id, self(), progress: i/count, label: "Regenerating passwords")
7676
end)
7777

7878
{:ok, "updated"}

lib/live_admin/components/container.ex

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -114,45 +114,38 @@ defmodule LiveAdmin.Components.Container do
114114
try do
115115
case apply(m, f, [Resource.query(resource, search, config) | args]) do
116116
{:ok, message} ->
117-
LiveAdmin.PubSub.broadcast(
117+
LiveAdmin.PubSub.announce(
118118
session.id,
119-
{:announce,
120-
%{
121-
message:
122-
trans("Task %{name} succeeded: '%{message}'",
123-
inter: [name: name, message: message]
124-
)
125-
}, type: :success}
119+
:success,
120+
trans("Task %{name} succeeded: '%{message}'",
121+
inter: [name: name, message: message]
122+
)
126123
)
127124

128125
{:error, message} ->
129-
LiveAdmin.PubSub.broadcast(
126+
LiveAdmin.PubSub.announce(
130127
session.id,
131-
{:announce,
132-
%{
133-
message:
134-
trans("Task %{name} failed: '%{message}'",
135-
inter: [name: name, message: message]
136-
),
137-
type: :error
138-
}}
128+
:error,
129+
trans("Task %{name} failed: '%{message}'",
130+
inter: [name: name, message: message]
131+
)
139132
)
140133
end
141134
rescue
142135
error ->
143136
Logger.error(inspect(error))
144137

145-
LiveAdmin.PubSub.broadcast(
138+
LiveAdmin.PubSub.announce(
146139
session.id,
147-
{:announce,
148-
%{message: trans("Task %{name} failed", inter: [name: name]), type: :error}}
140+
:error,
141+
trans("Task %{name} failed", inter: [name: name])
149142
)
150143
after
151-
LiveAdmin.PubSub.broadcast(session.id, {:job, %{pid: self(), progress: 1}})
144+
LiveAdmin.PubSub.update_job(session.id, self(), progress: 1)
152145
end
153146
end)
154147

155-
LiveAdmin.PubSub.broadcast(session.id, {:job, %{pid: task.pid, progress: 0, label: name}})
148+
LiveAdmin.PubSub.update_job(session.id, task.pid, progress: 0, label: name)
156149

157150
{:noreply, push_navigate(socket, to: route_with_params(socket.assigns))}
158151
end

lib/live_admin/components/nav/jobs.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ defmodule LiveAdmin.Components.Nav.Jobs do
7878
end
7979

8080
@impl true
81-
def handle_info(_, socket), do: {:noreply, socket}
81+
def handle_info(data, socket) do
82+
Logger.warning("Unhandled broadcast: #{inspect(data)}")
83+
84+
{:noreply, socket}
85+
end
8286

8387
@impl true
8488
def render(assigns) do

lib/live_admin/components/resource/index.ex

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -586,49 +586,50 @@ defmodule LiveAdmin.Components.Container.Index do
586586
fn ->
587587
pid = self()
588588

589-
LiveAdmin.PubSub.broadcast(session.id, {:job, %{pid: pid, progress: 0, label: label}})
589+
LiveAdmin.PubSub.update_job(session.id, pid, progress: 0, label: label)
590590

591591
records = Resource.all(ids, resource, prefix, repo)
592592

593-
{type, message} =
594-
records
595-
|> Enum.with_index()
596-
|> Enum.reduce(0, fn {record, i}, failed_count ->
597-
try do
598-
case apply(mod, func, [record | args]) do
599-
{:ok, _} -> failed_count
600-
{:error, _} -> failed_count + 1
601-
end
602-
rescue
603-
_ -> failed_count + 1
604-
after
605-
LiveAdmin.PubSub.broadcast(
606-
session.id,
607-
{:job, %{pid: pid, progress: (i + 1) / length(records)}}
608-
)
593+
records
594+
|> Enum.with_index()
595+
|> Enum.reduce(0, fn {record, i}, failed_count ->
596+
try do
597+
case apply(mod, func, [record | args]) do
598+
{:ok, _} -> failed_count
599+
{:error, _} -> failed_count + 1
609600
end
610-
end)
611-
|> case do
612-
0 ->
613-
{:success,
614-
trans("%{name} action run successfully on %{count} records",
615-
inter: [name: name, count: length(records)]
616-
)}
617-
618-
error_count ->
619-
{:error,
620-
trans(
621-
"%{name} action failed on %{error_count} records (%{success_count} succeeeded)",
622-
inter: [
623-
name: name,
624-
error_count: error_count,
625-
success_count: length(records) - error_count
626-
]
627-
)}
601+
rescue
602+
_ -> failed_count + 1
603+
after
604+
LiveAdmin.PubSub.update_job(session.id, pid, progress: (i + 1) / length(records))
628605
end
606+
end)
607+
|> case do
608+
0 ->
609+
LiveAdmin.PubSub.announce(
610+
session.id,
611+
:success,
612+
trans("%{name} action run successfully on %{count} records",
613+
inter: [name: name, count: length(records)]
614+
)
615+
)
616+
617+
error_count ->
618+
LiveAdmin.PubSub.announce(
619+
session.id,
620+
:error,
621+
trans(
622+
"%{name} action failed on %{error_count} records (%{success_count} succeeeded)",
623+
inter: [
624+
name: name,
625+
error_count: error_count,
626+
success_count: length(records) - error_count
627+
]
628+
)
629+
)
630+
end
629631

630-
LiveAdmin.PubSub.broadcast(session.id, {:job, %{pid: pid, progress: 1}})
631-
LiveAdmin.PubSub.broadcast(session.id, {:announce, %{message: message, type: type}})
632+
LiveAdmin.PubSub.update_job(session.id, pid, progress: 1)
632633
end,
633634
timeout: :infinity
634635
)

lib/live_admin/pub_sub.ex

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ defmodule LiveAdmin.PubSub do
1010
"""
1111

1212
@type session_id :: String.t()
13+
@type status :: :error | :success | :info
14+
@type message :: String.t()
15+
@type data :: Keyword.t()
1316

1417
@spec broadcast(session_id, {atom(), map()}) :: :ok
1518
@spec broadcast({atom(), map()}) :: :ok
@@ -22,8 +25,24 @@ defmodule LiveAdmin.PubSub do
2225
if(session_id, do: "session:#{session_id}", else: "all"),
2326
event
2427
)
28+
end
2529

26-
:ok
30+
@spec announce(session_id, status, message) :: :ok
31+
@spec announce(status, message) :: :ok
32+
@doc """
33+
Add a message with status to alerts
34+
"""
35+
def announce(session_id \\ nil, status, message) do
36+
broadcast(session_id, {:announce, %{message: message, type: status}})
37+
end
38+
39+
@spec update_job(session_id, pid, data) :: :ok
40+
@spec update_job(pid, data) :: :ok
41+
@doc """
42+
Update job progress
43+
"""
44+
def update_job(session_id \\ nil, pid, data) do
45+
broadcast(session_id, {:job, Enum.into(data, %{pid: pid})})
2746
end
2847

2948
@spec subscribe(session_id) :: :ok

0 commit comments

Comments
 (0)