-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrabbit_queue_forwarder.erl
More file actions
72 lines (57 loc) · 2.11 KB
/
rabbit_queue_forwarder.erl
File metadata and controls
72 lines (57 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
% For experimental use only!
-module(rabbit_queue_forwarder).
-include_lib("stdlib/include/qlc.hrl").
-export([get_queue_pid/1, get_queue_pid/2, set_queue_pid/2, set_queue_pid/3]).
-export([qpid_lookup_server/0, forward_queue/2, show_table/1]).
-include("/usr/lib/erlang/lib/rabbitmq_server-1.6.0/include/rabbit.hrl").
% -----------------------------------------------------------
get_queue_pid(VHostPath, Queue) ->
R = #resource{virtual_host = VHostPath, kind = queue, name = Queue},
F = fun() ->
mnesia:read(rabbit_durable_queue, R, read)
end,
{_, [Q]} = mnesia:transaction(F),
Q#amqqueue.pid.
get_queue_pid(Queue) -> get_queue_pid(<<"/">>, Queue).
set_queue_pid(VHostPath, Queue, NewPid) ->
R = #resource{virtual_host = VHostPath, kind = queue, name = Queue},
F = fun() ->
[Q1] = mnesia:read(rabbit_queue, R, write),
NewQ1 = Q1#amqqueue{pid=NewPid},
mnesia:write(rabbit_queue, NewQ1, write),
[Q] = mnesia:read(rabbit_durable_queue, R, write),
NewQ = Q#amqqueue{pid=NewPid},
mnesia:write(rabbit_durable_queue, NewQ, write)
end,
mnesia:transaction(F).
set_queue_pid(Queue, NewPid) -> set_queue_pid(<<"/">>, Queue, NewPid).
% -----------------------------------------------------------
qpid_lookup_server() ->
register(qpid_lookup_server, self()),
qpid_lookup_server_loop().
qpid_lookup_server_loop() ->
receive
{ qpid_lookup, From, Q } ->
io:format("Message from ~p for queue ~p~n", [From, Q]),
Qpid = get_queue_pid(Q),
io:format("Returning ~p~n", [Qpid]),
From ! { qpid_lookup_resp, Q, Qpid },
qpid_lookup_server_loop()
end.
forward_queue(Queue, DestNode) ->
{ qpid_lookup_server, DestNode } ! { qpid_lookup, self(), Queue },
receive
{ qpid_lookup_resp, Queue, NewPid } ->
io:format("Received qpid_lookup_resp: NewPid=~p~n", [NewPid]),
set_queue_pid(Queue, NewPid),
{ok, show_table(rabbit_queue)}
after 5000 ->
{fail, timeout}
end.
show_table(Tab) ->
mnesia:transaction(
fun() -> qlc:e(
qlc:q([X || X <- mnesia:table(Tab)])
) end
).
%% vim: expandtab