Skip to content

Commit b909a81

Browse files
committed
WIP: consumer timeouts in queue
1 parent 341306c commit b909a81

File tree

4 files changed

+91
-55
lines changed

4 files changed

+91
-55
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ update_config(Conf, State) ->
219219
competing
220220
end,
221221
Cfg = State#?STATE.cfg,
222+
DefConsumerTimeout = maps:get(consumer_timeout, Conf, 1800),
222223

223224
LastActive = maps:get(created, Conf, undefined),
224225
State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH,
@@ -228,7 +229,8 @@ update_config(Conf, State) ->
228229
consumer_strategy = ConsumerStrategy,
229230
delivery_limit = DeliveryLimit,
230231
expires = Expires,
231-
msg_ttl = MsgTTL},
232+
msg_ttl = MsgTTL,
233+
default_consumer_timeout_s = DefConsumerTimeout},
232234
last_active = LastActive}.
233235

234236
% msg_ids are scoped per consumer
@@ -677,7 +679,7 @@ live_indexes(#?STATE{cfg = #cfg{},
677679
DlxIndexes, Returns),
678680
maps:fold(fun (_Cid, #consumer{checked_out = Ch}, Acc0) ->
679681
maps:fold(
680-
fun (_MsgId, Msg, Acc) ->
682+
fun (_MsgId, ?C_MSG(Msg), Acc) ->
681683
[get_msg_idx(Msg) | Acc]
682684
end, Acc0, Ch)
683685
end, RtnIndexes, Consumers).
@@ -704,8 +706,11 @@ snapshot_installed(_Meta, #?MODULE{cfg = #cfg{},
704706
Acc) ->
705707
case node(Pid) == node() of
706708
true ->
707-
Iter = maps:iterator(Checked, reversed),
708-
Acc#{{Tag, Pid} => maps:to_list(Iter)};
709+
Iter = maps:iterator(Checked, ordered),
710+
Msgs = maps:fold(fun (K, ?C_MSG(M), Ac0) ->
711+
[{K, M} | Ac0]
712+
end, [], Iter),
713+
Acc#{{Tag, Pid} => Msgs};
709714
false ->
710715
Acc
711716
end
@@ -715,8 +720,14 @@ snapshot_installed(_Meta, #?MODULE{cfg = #cfg{},
715720
convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) ->
716721
%% the structure is intact for now
717722
Cons0 = element(#?STATE.consumers, StateV7),
718-
Cons = maps:map(fun (_CKey, #consumer{status = suspected_down} = C) ->
719-
C#consumer{status = {suspected_down, up}};
723+
%% TODO: use default for now
724+
Timeout = Ts + 1_800_000,
725+
Cons = maps:map(fun (_CKey, #consumer{status = suspected_down,
726+
checked_out = Ch0} = C) ->
727+
Ch = maps:map(fun (_, M) -> ?C_MSG(Timeout, M) end,
728+
Ch0),
729+
C#consumer{status = {suspected_down, up},
730+
checked_out = Ch};
720731
(_CKey, C) ->
721732
C
722733
end, Cons0),
@@ -732,6 +743,7 @@ convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) ->
732743
StateV8#?STATE{discarded_bytes = 0,
733744
messages = Pq,
734745
consumers = Cons,
746+
next_consumer_timeout = Timeout,
735747
last_command_time = Ts}.
736748

737749
purge_node(Meta, Node, State, Effects) ->
@@ -939,7 +951,7 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
939951
case find_consumer(CKey, Consumers) of
940952
{_CKey, #consumer{checked_out = Checked}} ->
941953
[begin
942-
Msg = maps:get(K, Checked),
954+
?C_MSG(Msg) = maps:get(K, Checked),
943955
I = get_msg_idx(Msg),
944956
H = get_msg_header(Msg),
945957
{K, {I, H}}
@@ -974,9 +986,10 @@ which_module(8) -> ?MODULE.
974986
gc = #aux_gc{} :: #aux_gc{},
975987
tick_pid :: undefined | pid(),
976988
cache = #{} :: map(),
977-
last_checkpoint :: tuple() | #snapshot{},
978-
bytes_in = 0 :: non_neg_integer(),
979-
bytes_out = 0 :: non_neg_integer()}).
989+
last_checkpoint :: tuple() | #snapshot{}
990+
% bytes_in = 0 :: non_neg_integer(),
991+
% bytes_out = 0 :: non_neg_integer()
992+
}).
980993

981994
init_aux(Name) when is_atom(Name) ->
982995
%% TODO: catch specific exception throw if table already exists
@@ -1001,9 +1014,10 @@ handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux)
10011014
gc = element(5, AuxV3),
10021015
tick_pid = element(6, AuxV3),
10031016
cache = element(7, AuxV3),
1004-
last_checkpoint = element(8, AuxV3),
1005-
bytes_in = element(9, AuxV3),
1006-
bytes_out = 0},
1017+
last_checkpoint = element(8, AuxV3)
1018+
% bytes_in = element(9, AuxV3),
1019+
% bytes_out = 0
1020+
},
10071021
handle_aux(RaftState, Tag, Cmd, AuxV4, RaAux);
10081022
handle_aux(leader, cast, eval,
10091023
#?AUX{last_decorators_state = LastDec,
@@ -1048,14 +1062,14 @@ handle_aux(_RaftState, cast, eval,
10481062
{Check, Effects} = do_snapshot(EffMacVer, Ts, Check0, RaAux,
10491063
DiscardedBytes, false),
10501064
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
1051-
handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}},
1052-
#?AUX{bytes_in = Bytes} = Aux0,
1053-
RaAux) ->
1054-
{no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []};
1055-
handle_aux(_RaftState, cast, {bytes_out, BodySize},
1056-
#?AUX{bytes_out = Bytes} = Aux0,
1057-
RaAux) ->
1058-
{no_reply, Aux0#?AUX{bytes_out = Bytes + BodySize}, RaAux, []};
1065+
% handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}},
1066+
% #?AUX{bytes_in = Bytes} = Aux0,
1067+
% RaAux) ->
1068+
% {no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []};
1069+
% handle_aux(_RaftState, cast, {bytes_out, BodySize},
1070+
% #?AUX{bytes_out = Bytes} = Aux0,
1071+
% RaAux) ->
1072+
% {no_reply, Aux0#?AUX{bytes_out = Bytes + BodySize}, RaAux, []};
10591073
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
10601074
consumer_key = Key} = Ret, Corr, Pid},
10611075
Aux0, RaAux0) ->
@@ -1066,7 +1080,7 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
10661080
{ConsumerKey, #consumer{checked_out = Checked}} ->
10671081
{RaAux, ToReturn} =
10681082
maps:fold(
1069-
fun (MsgId, Msg, {RA0, Acc}) ->
1083+
fun (MsgId, ?C_MSG(Msg), {RA0, Acc}) ->
10701084
Idx = get_msg_idx(Msg),
10711085
Header = get_msg_header(Msg),
10721086
%% it is possible this is not found if the consumer
@@ -1113,7 +1127,7 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
11131127
#{ConsumerKey := #consumer{checked_out = Checked}} ->
11141128
{RaState, IdMsgs} =
11151129
maps:fold(
1116-
fun (MsgId, Msg, {S0, Acc}) ->
1130+
fun (MsgId, ?C_MSG(Msg), {S0, Acc}) ->
11171131
Idx = get_msg_idx(Msg),
11181132
Header = get_msg_header(Msg),
11191133
%% it is possible this is not found if the consumer
@@ -1201,6 +1215,11 @@ handle_aux(leader, _, {dlx, setup}, Aux, RaAux) ->
12011215
{no_reply, Aux, RaAux};
12021216
handle_aux(_, _, {dlx, teardown, Pid}, Aux, RaAux) ->
12031217
terminate_dlx_worker(Pid),
1218+
{no_reply, Aux, RaAux};
1219+
handle_aux(_, _, Unhandled, Aux, RaAux) ->
1220+
#?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux),
1221+
?LOG_DEBUG("~ts: rabbit_fifo: unhandled aux command ~P",
1222+
[rabbit_misc:rs(QR), Unhandled, 10]),
12041223
{no_reply, Aux, RaAux}.
12051224

12061225

@@ -1782,8 +1801,8 @@ return(Meta, ConsumerKey,
17821801
lists:foldl(
17831802
fun(MsgId, Acc = {S0, E0}) ->
17841803
case Checked of
1785-
#{MsgId := Msg} ->
1786-
return_one(Meta, MsgId, Msg, IncrDelCount, Anns,
1804+
#{MsgId := CMsg} ->
1805+
return_one(Meta, MsgId, CMsg, IncrDelCount, Anns,
17871806
S0, E0, ConsumerKey);
17881807
#{} ->
17891808
Acc
@@ -1806,7 +1825,7 @@ complete(Meta, ConsumerKey, [MsgId],
18061825
messages_total = Tot} = State0,
18071826
Effects) ->
18081827
case maps:take(MsgId, Checked0) of
1809-
{Msg, Checked} ->
1828+
{?C_MSG(Msg), Checked} ->
18101829
Hdr = get_msg_header(Msg),
18111830
SettledSize = get_header(size, Hdr),
18121831
Con = Con0#consumer{checked_out = Checked,
@@ -1828,7 +1847,7 @@ complete(Meta, ConsumerKey, MsgIds,
18281847
= lists:foldl(
18291848
fun (MsgId, {S0, Ch0}) ->
18301849
case maps:take(MsgId, Ch0) of
1831-
{Msg, Ch} ->
1850+
{?C_MSG(Msg), Ch} ->
18321851
Hdr = get_msg_header(Msg),
18331852
S = get_header(size, Hdr) + S0,
18341853
{S, Ch};
@@ -1964,7 +1983,7 @@ annotate_msg(Header, Msg0) ->
19641983
Msg0
19651984
end.
19661985

1967-
return_one(Meta, MsgId, Msg0, DeliveryFailed, Anns,
1986+
return_one(Meta, MsgId, ?C_MSG(Msg0), DeliveryFailed, Anns,
19681987
#?STATE{returns = Returns,
19691988
consumers = Consumers,
19701989
dlx = DlxState0,
@@ -2005,8 +2024,8 @@ return_one(Meta, MsgId, Msg0, DeliveryFailed, Anns,
20052024
return_all(Meta, #?STATE{consumers = Cons} = State0, Effects0, ConsumerKey,
20062025
#consumer{checked_out = Checked} = Con, DeliveryFailed) ->
20072026
State = State0#?STATE{consumers = Cons#{ConsumerKey => Con}},
2008-
maps:fold(fun (MsgId, Msg, {S, E}) ->
2009-
return_one(Meta, MsgId, Msg, DeliveryFailed, #{},
2027+
maps:fold(fun (MsgId, CMsg, {S, E}) ->
2028+
return_one(Meta, MsgId, CMsg, DeliveryFailed, #{},
20102029
S, E, ConsumerKey)
20112030
end, {State, Effects0}, maps:iterator(Checked, ordered)).
20122031

@@ -2248,7 +2267,10 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
22482267
credit = Credit,
22492268
delivery_count = DelCnt0,
22502269
cfg = Cfg} = Con0 ->
2251-
Checked = maps:put(Next, Msg, Checked0),
2270+
Timeout = (Ts div 1000) + State0#?STATE.cfg#cfg.default_consumer_timeout_s,
2271+
Checked = maps:put(Next,
2272+
?C_MSG(Timeout * 1000, Msg),
2273+
Checked0),
22522274
DelCnt = case credit_api_v2(Cfg) of
22532275
true -> add(DelCnt0, 1);
22542276
false -> DelCnt0 + 1
@@ -2950,7 +2972,7 @@ smallest_raft_index(#?STATE{messages = Messages,
29502972
min(get_msg_idx(Msg), Acc)
29512973
end, Min0, Returns),
29522974
Min2 = maps:fold(fun (_Cid, #consumer{checked_out = Ch}, Acc0) ->
2953-
maps:fold(fun (_MsgId, Msg, Acc) ->
2975+
maps:fold(fun (_MsgId, ?C_MSG(Msg), Acc) ->
29542976
min(get_msg_idx(Msg), Acc)
29552977
end, Acc0, Ch)
29562978
end, Min1, Consumers),
@@ -3185,7 +3207,7 @@ discard(Meta, MsgIds, ConsumerKey,
31853207
case maps:get(Id, Checked, undefined) of
31863208
undefined ->
31873209
false;
3188-
Msg0 ->
3210+
?C_MSG(Msg0) ->
31893211
{true, incr_msg_headers(Msg0, DelFailed, Anns)}
31903212
end
31913213
end, MsgIds),

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
%% Raw message data is always stored on disk.
1313
-define(MSG(Index, Header), ?TUPLE(Index, Header)).
1414

15+
-define(C_MSG(Timeout, Msg), {Timeout, Msg}).
16+
-define(C_MSG(Msg), {_, Msg}).
1517
-define(NIL, []).
1618

1719
-define(IS_HEADER(H),
@@ -82,6 +84,9 @@
8284

8385
-type msg() :: packed_msg() | optimised_tuple(ra:index(), msg_header()).
8486

87+
%% a consumer message
88+
-type c_msg() :: {LockExpiration :: milliseconds(), msg()}.
89+
8590
-type delivery_msg() :: {msg_id(), {msg_header(), raw_msg()}}.
8691
%% A tuple consisting of the message id, and the headered message.
8792

@@ -115,7 +120,8 @@
115120
username => binary(),
116121
prefetch => non_neg_integer(),
117122
args => list(),
118-
priority => non_neg_integer()
123+
priority => 0..255,
124+
timeout => milliseconds()
119125
}.
120126
%% static meta data associated with a consumer
121127

@@ -139,7 +145,9 @@
139145
-define(LOW_LIMIT, 0.8).
140146
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).
141147

148+
-type seconds() :: non_neg_integer().
142149
-type milliseconds() :: non_neg_integer().
150+
143151
-record(consumer_cfg,
144152
{meta = #{} :: consumer_meta(),
145153
pid :: pid(),
@@ -157,9 +165,13 @@
157165

158166
-record(consumer,
159167
{cfg = #consumer_cfg{},
160-
status = up :: consumer_status() | {suspected_down, consumer_status()},
168+
status = up :: consumer_status() |
169+
{suspected_down, consumer_status()} |
170+
%% a message has been pending for longer than the
171+
%% consumer timeout
172+
{timeout, consumer_status()},
161173
next_msg_id = 0 :: msg_id(),
162-
checked_out = #{} :: #{msg_id() => msg()},
174+
checked_out = #{} :: #{msg_id() => c_msg()},
163175
%% max number of messages that can be sent
164176
%% decremented for each delivery
165177
credit = 0 :: non_neg_integer(),
@@ -200,7 +212,7 @@
200212
delivery_limit :: option(non_neg_integer()),
201213
expires :: option(milliseconds()),
202214
msg_ttl :: option(milliseconds()),
203-
unused_2 = ?NIL,
215+
default_consumer_timeout_s = 1800 :: seconds(),
204216
unused_3 = ?NIL
205217
}).
206218

@@ -250,7 +262,7 @@
250262
% index when there are large gaps but should be faster than gb_trees
251263
% for normal appending operations as it's backed by a map
252264
last_command_time = 0,
253-
unused_1 = ?NIL,
265+
next_consumer_timeout = undefined :: option(milliseconds()),
254266
% consumers need to reflect consumer state at time of snapshot
255267
consumers = #{} :: #{consumer_key() => consumer()},
256268
% consumers that require further service are queued here
@@ -273,14 +285,15 @@
273285
queue_resource := rabbit_types:r('queue'),
274286
dead_letter_handler => dead_letter_handler(),
275287
become_leader_handler => applied_mfa(),
276-
checkpoint_min_indexes => non_neg_integer(),
277-
checkpoint_max_indexes => non_neg_integer(),
288+
% checkpoint_min_indexes => non_neg_integer(),
289+
% checkpoint_max_indexes => non_neg_integer(),
278290
max_length => non_neg_integer(),
279291
max_bytes => non_neg_integer(),
280292
overflow_strategy => drop_head | reject_publish,
281293
single_active_consumer_on => boolean(),
282294
delivery_limit => non_neg_integer() | -1,
283295
expires => non_neg_integer(),
284296
msg_ttl => non_neg_integer(),
285-
created => non_neg_integer()
297+
created => non_neg_integer(),
298+
consumer_timeout => seconds()
286299
}.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1787,7 +1787,8 @@ consumer_message_is_delevered_after_snapshot(Config) ->
17871787
%% then purge
17881788
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),
17891789

1790-
MacVer = lists:min([V || {ok, V} <- erpc:multicall(Nodes, rabbit_fifo, version, [])]),
1790+
MacVer = lists:min([V || {ok, V} <-
1791+
erpc:multicall(Nodes, rabbit_fifo, version, [])]),
17911792
ct:pal("machine version is ~b", [MacVer]),
17921793

17931794
%% only await snapshot if all members have at least machine version 8

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3189,10 +3189,10 @@ modify_test(Config) ->
31893189
fun (#rabbit_fifo{consumers =
31903190
#{CK1 := #consumer{checked_out = Ch}}}) ->
31913191
?assertMatch(
3192-
?MSG(_, #{acquired_count := 1,
3193-
anns := #{<<"x-opt-blah">> := <<"blah1">>}} = H)
3192+
?C_MSG(?MSG(_, #{acquired_count := 1,
3193+
anns := #{<<"x-opt-blah">> := <<"blah1">>}} = H))
31943194
when not is_map_key(delivery_count, H),
3195-
maps:get(1, Ch))
3195+
maps:get(1, Ch))
31963196
end),
31973197
%% delivery_failed = true does increment delivery_count
31983198
{?LINE, rabbit_fifo:make_modify(CK1, [1], true, false,
@@ -3203,9 +3203,9 @@ modify_test(Config) ->
32033203
fun (#rabbit_fifo{consumers =
32043204
#{CK1 := #consumer{checked_out = Ch}}}) ->
32053205
?assertMatch(
3206-
?MSG(_, #{delivery_count := 1,
3207-
acquired_count := 2,
3208-
anns := #{<<"x-opt-blah">> := <<"blah2">>}}),
3206+
?C_MSG(?MSG(_, #{delivery_count := 1,
3207+
acquired_count := 2,
3208+
anns := #{<<"x-opt-blah">> := <<"blah2">>}})),
32093209
maps:get(2, Ch))
32103210
end),
32113211
%% delivery_failed = true and undeliverable_here = true is the same as discard
@@ -3214,13 +3214,13 @@ modify_test(Config) ->
32143214
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{next_msg_id = 3,
32153215
checked_out = Ch}}}
32163216
when map_size(Ch) == 0,
3217-
fun (#rabbit_fifo{dlx = #rabbit_fifo_dlx{discards = Discards}}) ->
3218-
?assertMatch([[_|
3219-
?MSG(_, #{delivery_count := 2,
3220-
acquired_count := 3,
3221-
anns := #{<<"x-opt-blah">> := <<"blah3">>}})]],
3222-
lqueue:to_list(Discards))
3223-
end)
3217+
fun (#rabbit_fifo{dlx = #rabbit_fifo_dlx{discards = Discards}}) ->
3218+
?assertMatch([[_|
3219+
?MSG(_, #{delivery_count := 2,
3220+
acquired_count := 3,
3221+
anns := #{<<"x-opt-blah">> := <<"blah3">>}})]],
3222+
lqueue:to_list(Discards))
3223+
end)
32243224
],
32253225
{_S1, _} = run_log(Config, S0, Entries, fun single_active_invariant/1),
32263226

0 commit comments

Comments
 (0)