-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmessage_queue_watcher.ex
More file actions
114 lines (92 loc) · 3.11 KB
/
message_queue_watcher.ex
File metadata and controls
114 lines (92 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
defmodule PostgresqlMessageQueue.Messaging.MessageQueueWatcher do
@moduledoc """
Watches the message message queue for new messages (see
`PostgresqlMessageQueue.Messaging.store_message_in_message_queue/1`). When new messages arrive,
notifies the MessageQueueProcessor process for the relevant queues.
"""
alias __MODULE__, as: Self
alias PostgresqlMessageQueue.Messaging.MessageQueueProcessor
alias PostgresqlMessageQueue.Persistence.Repo
alias PostgresqlMessageQueue.Persistence.NotificationListener
use GenServer
require Logger
defmodule State do
@moduledoc false
alias __MODULE__, as: Self
@enforce_keys [:notify_queues]
defstruct @enforce_keys
@type t :: %Self{notify_queues: MapSet.t(String.t())}
@spec new() :: Self.t()
def new() do
%Self{notify_queues: MapSet.new()}
end
end
# Client
@spec start_link(Keyword.t()) :: {:ok, pid()}
def start_link(opts \\ []) do
GenServer.start_link(Self, opts, name: Self)
end
@spec notify_on_new_message(String.t()) :: :new_subscription | :already_subscribed
def notify_on_new_message(queue) when is_binary(queue) do
GenServer.call(Self, {:notify_on_new_message, queue})
end
# Server
@impl GenServer
def init(_opts) do
:ok = NotificationListener.listen(Repo.NotificationListener, "message_queue_messages_inserted")
{:ok, State.new()}
end
@impl GenServer
def handle_call({:notify_on_new_message, queue}, _from, %State{} = state) do
{reply, state} =
if queue in state.notify_queues do
{:already_subscribed, state}
else
state = add_notify_queue(state, queue)
{:new_subscription, state}
end
{:reply, reply, state}
end
@impl GenServer
def handle_info(
%NotificationListener.Notification{
channel: "message_queue_messages_inserted",
payload: queue_name
},
%State{} = state
) do
state =
if queue_name in state.notify_queues do
Logger.info(log_prefix() <> "Notifying of new message(s) in queue: #{queue_name}")
:ok = MessageQueueProcessor.check_for_new_messages(queue_name)
remove_notify_queue(state, queue_name)
else
state
end
{:noreply, state}
end
@spec add_notify_queue(State.t(), String.t()) :: State.t()
defp add_notify_queue(%State{} = state, queue) when is_binary(queue) do
already_subscribed_info =
if Enum.empty?(state.notify_queues) do
"(none)"
else
Enum.join(state.notify_queues, ", ")
end
Logger.info(
log_prefix() <>
"Subscribing queue: #{queue}. Already subscribed: #{already_subscribed_info}."
)
notify_queues = MapSet.put(state.notify_queues, queue)
%{state | notify_queues: notify_queues}
end
@spec remove_notify_queue(State.t(), String.t()) :: State.t()
defp remove_notify_queue(%State{} = state, queue) when is_binary(queue) do
notify_queues = MapSet.delete(state.notify_queues, queue)
%{state | notify_queues: notify_queues}
end
@spec log_prefix() :: String.t()
defp log_prefix() do
"Messaging.MessageQueueWatcher [#{inspect(self())}] "
end
end