Skip to content

Commit 341306c

Browse files
committed
QQ: Message expiration scans all priorities
Expiration is now done for all priorities using a separate command that performs a "shallow" expiry run. I.e. each priority will have messages dropped until it encounters the next non expired one. This means that it can still leave expired messages on queue if they are preceeded by message that are not yet or never will be expired. This is consistent with existing expiration logic.
1 parent 5a56c37 commit 341306c

File tree

7 files changed

+368
-118
lines changed

7 files changed

+368
-118
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 171 additions & 104 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@
175175

176176
-record(enqueuer,
177177
{next_seqno = 1 :: msg_seqno(),
178-
% out of order enqueues - sorted list
179178
unused = ?NIL,
180179
status = up :: up | suspected_down,
181180
%% it is useful to have a record of when this was blocked
@@ -250,7 +249,7 @@
250249
% rabbit_fifo_index can be slow when calculating the smallest
251250
% index when there are large gaps but should be faster than gb_trees
252251
% for normal appending operations as it's backed by a map
253-
unused_0 = ?NIL,
252+
last_command_time = 0,
254253
unused_1 = ?NIL,
255254
% consumers need to reflect consumer state at time of snapshot
256255
consumers = #{} :: #{consumer_key() => consumer()},

deps/rabbit/src/rabbit_fifo_pq.erl

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
from_lqueue/1,
2121
indexes/1,
2222
get_lowest_index/1,
23-
overview/1
23+
overview/1,
24+
take_while/2,
25+
any_priority_next/2,
26+
fold_priorities_next/3
2427
]).
2528

2629
-define(STATE, pq).
@@ -156,8 +159,75 @@ overview(#?STATE{len = Len,
156159
num_active_priorities => map_size(Buckets),
157160
lowest_index => get_lowest_index(State)}.
158161

162+
-spec take_while(fun ((msg()) -> boolean()), state()) ->
163+
{[msg()], state()}.
164+
take_while(Fun, #?STATE{len = Len,
165+
buckets = Buckets0} = State)
166+
when is_function(Fun) ->
167+
{Buckets, Acc} = maps:fold(
168+
fun (P, Q0, {B0, Items0}) ->
169+
case take_while(Q0, Fun, Items0) of
170+
{?EMPTY, Items} ->
171+
{maps:remove(P, B0), Items};
172+
{Q, Items} ->
173+
{B0#{P => Q}, Items}
174+
end
175+
end,
176+
{Buckets0, []},
177+
maps:iterator(Buckets0, ordered)),
178+
179+
%% TODO: optimise updates
180+
%% update bitmap
181+
Bitmap = maps:fold(fun (P, _Q, B) -> B bor (1 bsl P) end, 0, Buckets),
182+
183+
{lists:reverse(Acc),
184+
State#?STATE{len = Len - length(Acc),
185+
buckets = Buckets,
186+
bitmap = Bitmap}}.
187+
188+
-spec any_priority_next(fun ((msg()) -> boolean()), state()) ->
189+
boolean().
190+
any_priority_next(Fun, #?STATE{buckets = Buckets0})
191+
when is_function(Fun) ->
192+
maps_any(Fun, maps:next(maps:iterator(Buckets0))).
193+
194+
-spec fold_priorities_next(fun ((msg(), Acc) -> Acc), Acc, state()) ->
195+
Acc when Acc :: term().
196+
fold_priorities_next(Fun, Acc, #?STATE{buckets = Buckets0})
197+
when is_function(Fun) ->
198+
maps:fold(fun (_P, Q, A) ->
199+
Fun(peek(Q), A)
200+
end, Acc, Buckets0).
201+
159202
%% INTERNAL
160203

204+
maps_any(_Fun, none) ->
205+
false;
206+
maps_any(Fun, {_, Q, I}) ->
207+
case Fun(peek(Q)) of
208+
true ->
209+
true;
210+
false ->
211+
maps_any(Fun, maps:next(I))
212+
end.
213+
214+
take_while(?EMPTY, _Fun, Acc) ->
215+
{?EMPTY, Acc};
216+
take_while(Q, Fun, Acc) ->
217+
case peek(Q) of
218+
empty ->
219+
{Q, Acc};
220+
Msg ->
221+
case Fun(Msg) of
222+
true ->
223+
take_while(drop(Q), Fun, [Msg | Acc]);
224+
false ->
225+
{Q, Acc}
226+
end
227+
end.
228+
229+
230+
161231
%% invariant, if the queue is non empty so is the Out (right) list.
162232
in(X, ?EMPTY) ->
163233
{1, [], [X]};

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1987,7 +1987,7 @@ make_ra_conf(Q, ServerId, Membership, MacVersion)
19871987
Membership, MacVersion).
19881988

19891989
make_ra_conf(Q, ServerId, TickTimeout,
1990-
_SnapshotInterval, CheckpointInterval,
1990+
_SnapshotInterval, _CheckpointInterval,
19911991
Membership, MacVersion) ->
19921992
QName = amqqueue:get_name(Q),
19931993
#resource{name = QNameBin} = QName,
@@ -1998,8 +1998,9 @@ make_ra_conf(Q, ServerId, TickTimeout,
19981998
Formatter = {?MODULE, format_ra_event, [QName]},
19991999
LogCfg = #{uid => UId,
20002000
min_snapshot_interval => 0,
2001-
min_checkpoint_interval => CheckpointInterval,
2002-
max_checkpoints => 3},
2001+
% min_checkpoint_interval => CheckpointInterval,
2002+
% max_checkpoints => 3,
2003+
major_compaction_strategy => {num_minors, 32}},
20032004
rabbit_misc:maps_put_truthy(membership, Membership,
20042005
#{cluster_name => ClusterName,
20052006
id => ServerId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2147,6 +2147,8 @@ dead_letter_policy(Config) ->
21472147
%% Test that messages are at most once dead letter in the correct order
21482148
%% for reason 'maxlen'.
21492149
at_most_once_dead_letter_order_maxlen(Config) ->
2150+
check_quorum_queues_v8_compat(Config),
2151+
21502152
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
21512153

21522154
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -2261,16 +2263,18 @@ at_most_once_dead_letter_order_delivery_limit(Config) ->
22612263
#'basic.publish'{routing_key = QQ},
22622264
#amqp_msg{payload = <<"m2">>}),
22632265

2264-
ok = subscribe(Ch, QQ, false),
2266+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
2267+
ok = subscribe(Ch2, QQ, false),
22652268
receive {_, #amqp_msg{payload = P1}} ->
22662269
?assertEqual(<<"m1">>, P1)
22672270
end,
22682271
receive {_, #amqp_msg{payload = P2}} ->
22692272
?assertEqual(<<"m2">>, P2)
22702273
end,
2271-
ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0,
2272-
multiple = true,
2273-
requeue = true}),
2274+
amqp_channel:close(Ch2),
2275+
% ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0,
2276+
% multiple = true,
2277+
% requeue = true}),
22742278

22752279
wait_for_consensus(DLQ, Config),
22762280
wait_for_messages_ready(Servers, ra_name(DLQ), 2),
@@ -5337,7 +5341,7 @@ check_quorum_queues_v8_compat(Config) ->
53375341
true ->
53385342
ok;
53395343
false ->
5340-
throw({skip, "test will only work on QQ machine version > 8"})
5344+
throw({skip, "test will only work on QQ machine version >= 8"})
53415345
end.
53425346

53435347
lists_interleave([], _List) ->

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,8 +1620,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(Config)
16201620
],
16211621
{State1, _} = run_log(Config, State0, Entries),
16221622
Effects = rabbit_fifo:state_enter(leader, State1),
1623-
%% 2 effects for each consumer process (channel process), 1 effect for the node,
1624-
?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)).
1623+
ct:pal("Efx ~p", [Effects]),
1624+
%% 2 effects for each consumer process (channel process),
1625+
%% 1 effect for the node,
1626+
%% 1 for decorators
1627+
?assertEqual(2 * 3 + 1 + 1, length(Effects)).
16251628

16261629
single_active_consumer_state_enter_eol_include_waiting_consumers_test(Config) ->
16271630
Resource = rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
@@ -3223,6 +3226,39 @@ modify_test(Config) ->
32233226

32243227
ok.
32253228

3229+
priorities_expire_test(Config) ->
3230+
State0 = init(#{name => ?FUNCTION_NAME,
3231+
queue_resource => rabbit_misc:r("/", queue,
3232+
?FUNCTION_NAME_B)}),
3233+
Pid1 = spawn(fun() -> ok end),
3234+
3235+
Entries =
3236+
[
3237+
{?LINE, make_enqueue(Pid1, 1,
3238+
mk_mc(<<"p1">>, #'P_basic'{priority = 9,
3239+
expiration = <<"100">>}))},
3240+
{?LINE, make_enqueue(Pid1, 2,
3241+
mk_mc(<<"p1">>, #'P_basic'{priority = 9,
3242+
expiration = <<"100000">>}))},
3243+
{?LINE, make_enqueue(Pid1, 3,
3244+
mk_mc(<<"p7">>, #'P_basic'{priority = 7,
3245+
expiration = <<"100">>}))},
3246+
{?LINE, make_enqueue(Pid1, 4,
3247+
mk_mc(<<"p7">>, #'P_basic'{priority = 7,
3248+
expiration = <<"100000">>}))},
3249+
{?LINE, make_enqueue(Pid1, 5,
3250+
mk_mc(<<"p7b">>, #'P_basic'{priority = 3}))},
3251+
3252+
{?LINE + 101, {timeout, {expire_msgs, shallow}}},
3253+
3254+
?ASSERT(_, fun(State) ->
3255+
?assertMatch(#{num_messages := 3},
3256+
rabbit_fifo:overview(State))
3257+
end)
3258+
],
3259+
{_State2, _} = run_log(Config, State0, Entries),
3260+
ok.
3261+
32263262
%% Utility
32273263
%%
32283264

@@ -3232,6 +3268,7 @@ apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
32323268
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
32333269
handle_aux(S, T, C, A, A2) -> rabbit_fifo:handle_aux(S, T, C, A, A2).
32343270
make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M).
3271+
make_enqueue(P, S, M) -> rabbit_fifo:make_enqueue(P, S, M).
32353272

32363273
cid(A) when is_atom(A) ->
32373274
atom_to_binary(A, utf8).
@@ -3242,10 +3279,13 @@ single_active_invariant( #rabbit_fifo{consumers = Cons}) ->
32423279
end, Cons)).
32433280

32443281
mk_mc(Body) ->
3282+
mk_mc(Body, #'P_basic'{}).
3283+
3284+
mk_mc(Body, BasicProps) ->
32453285
mc_amqpl:from_basic_message(
32463286
#basic_message{routing_keys = [<<"">>],
32473287
exchange_name = #resource{name = <<"x">>,
32483288
kind = exchange,
32493289
virtual_host = <<"v">>},
3250-
content = #content{properties = #'P_basic'{},
3290+
content = #content{properties = BasicProps,
32513291
payload_fragments_rev = [Body]}}).

deps/rabbit/test/rabbit_fifo_pq_SUITE.erl

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ all() ->
1717
all_tests() ->
1818
[
1919
basics,
20+
take_while,
21+
any_priority_next,
2022
property
2123
].
2224

@@ -80,6 +82,73 @@ basics(_Config) ->
8082
empty = rabbit_fifo_pq:out(Q6),
8183
ok.
8284

85+
take_while(_Config) ->
86+
Q1 = lists:foldl(
87+
fun ({P, I}, Q) ->
88+
rabbit_fifo_pq:in(P, I, Q)
89+
end, rabbit_fifo_pq:new(),
90+
[
91+
{1, ?MSG(1)}, {1, ?MSG(2)}, {1, ?MSG(3)},
92+
{2, ?MSG(1)}, {2, ?MSG(2)}, {2, ?MSG(3)},
93+
{3, ?MSG(1)}, {3, ?MSG(2)}, {3, ?MSG(3)},
94+
{4, ?MSG(1)}, {4, ?MSG(2)}, {4, ?MSG(3)},
95+
{5, ?MSG(1, 10)}, {5, ?MSG(2, 20)}, {5, ?MSG(3, 30)}
96+
]),
97+
98+
{Taken, Q2} = rabbit_fifo_pq:take_while(fun (?MSG(I, _)) ->
99+
I < 3
100+
end, Q1),
101+
?assertMatch([
102+
?MSG(1, 10), ?MSG(2, 20),
103+
?MSG(1, 1), ?MSG(2, 2),
104+
?MSG(1, 1), ?MSG(2, 2),
105+
?MSG(1, 1), ?MSG(2, 2),
106+
?MSG(1, 1), ?MSG(2, 2)
107+
], Taken),
108+
109+
110+
?assertEqual(5, rabbit_fifo_pq:len(Q2)),
111+
?assertEqual(10, length(Taken)),
112+
{?MSG(3, 30), Q3} = rabbit_fifo_pq:out(Q2),
113+
{?MSG(3), Q4} = rabbit_fifo_pq:out(Q3),
114+
{?MSG(3), Q5} = rabbit_fifo_pq:out(Q4),
115+
{?MSG(3), Q6} = rabbit_fifo_pq:out(Q5),
116+
{?MSG(3), _Q7} = rabbit_fifo_pq:out(Q6),
117+
118+
119+
{_Taken2, Q} = rabbit_fifo_pq:take_while(fun (?MSG(_, _)) ->
120+
true
121+
end, Q2),
122+
123+
ct:pal("Q ~p", [Q]),
124+
125+
ok.
126+
127+
any_priority_next(_Config) ->
128+
Q0 = rabbit_fifo_pq:new(),
129+
130+
?assertNot(rabbit_fifo_pq:any_priority_next(fun (_) -> true end, Q0)),
131+
132+
Q1 = lists:foldl(fun ({P, I}, Q) ->
133+
rabbit_fifo_pq:in(P, I, Q)
134+
end, Q0,
135+
[
136+
{1, ?MSG(1)}, {1, ?MSG(2)}, {1, ?MSG(3)},
137+
{2, ?MSG(1)}, {2, ?MSG(2)}, {2, ?MSG(3)},
138+
{3, ?MSG(2)}, {3, ?MSG(3)},
139+
{4, ?MSG(1)}, {4, ?MSG(2)}, {4, ?MSG(3)},
140+
{5, ?MSG(1)}, {5, ?MSG(2)}, {5, ?MSG(3)}
141+
]),
142+
143+
?assert(rabbit_fifo_pq:any_priority_next(fun (?MSG(I, _)) ->
144+
I > 1
145+
end, Q1)),
146+
?assertNot(rabbit_fifo_pq:any_priority_next(fun (?MSG(I, _)) ->
147+
I > 6
148+
end, Q1)),
149+
150+
ok.
151+
83152
hi_is_prioritised(_Config) ->
84153
Q0 = rabbit_fifo_q:new(),
85154
%% when `hi' has a lower index than the next 'no' then it is still

0 commit comments

Comments
 (0)