Skip to content

Commit 8a90671

Browse files
authored
Merge pull request #4599 from esl/max-worker-queue-len
Implement worker queue limit for outgoing pools
2 parents 580a111 + 551ed6b commit 8a90671

File tree

11 files changed

+148
-123
lines changed

11 files changed

+148
-123
lines changed

doc/configuration/outgoing-connections.md

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ Number of workers to be started by the pool.
5757

5858
Number of milliseconds after which a call to the pool will time out.
5959

60+
### `outgoing_pools.*.*.max_worker_queue_len`
61+
* **Syntax:** non-negative integer
62+
* **Default:** not set
63+
* **Example:** `max_worker_queue_len = 1000`
64+
65+
Maximum number of requests waiting in the incoming message queue of a worker. By default there is no such limit.
66+
When this queue length is reached for all workers, further incoming requests will be dropped.
67+
68+
!!! Note
69+
This option is applicable only to the `best_worker` strategy. Using it for other strategies is not allowed.
70+
6071
## Connection options
6172

6273
Options specific to a pool connection are defined in a subsection starting with `[outgoing_pools.*.*.connection]`.
@@ -338,13 +349,6 @@ Sets the RabbitMQ Virtual Host. The host needs to exist, as it is **not** create
338349

339350
Enables/disables one-to-one publishers confirms.
340351

341-
### `outgoing_pools.rabbit.*.connection.max_worker_queue_len`
342-
* **Syntax:** non-negative integer or `"infinity"`
343-
* **Default:** `1000`
344-
* **Example:** `max_worker_queue_len = "infinity"`
345-
346-
Sets a limit of messages in a worker's mailbox above which the worker starts dropping the messages. If a worker message queue length reaches the limit, messages from the head of the queue are dropped until the queue length is again below the limit. Use `infinity` to disable.
347-
348352
---
349353
To enable TLS, you need to include the [TLS section](#tls-options) in the connection options.
350354

doc/migrations/6.5.0_6.x.x.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
## Configuration options
2+
3+
### RabbitMQ connection pool
4+
5+
The RabbitMQ-specific connection option `max_worker_queue_len` is now replaced with a more generic [worker pool option](../configuration/outgoing-connections.md#outgoing_poolsmax_worker_queue_len) with the same name.
6+
Contrary to the removed option, it now ensures that the queue limit is never exceeded.
7+
Make sure you use the new option instead of the removed one.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ nav:
219219
- '6.3.1 to 6.3.2': 'migrations/6.3.1_6.3.2.md'
220220
- '6.3.3 to 6.4.0': 'migrations/6.3.3_6.4.0.md'
221221
- '6.4.0 to 6.5.0': 'migrations/6.4.0_6.5.0.md'
222+
- '6.5.0 to 6.x.x': 'migrations/6.5.0_6.x.x.md'
222223
- 'MAM MUC migration helper': 'migrations/jid-from-mam-muc-script.md'
223224
- 'Contributions to the Ecosystem': 'Contributions.md'
224225
- 'MongooseIM History': 'History.md'

src/config/mongoose_config_spec.erl

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,9 @@ wpool(ExtraDefaults) ->
452452
<<"strategy">> => #option{type = atom,
453453
validate = {enum, wpool_strategy_values()}},
454454
<<"call_timeout">> => #option{type = integer,
455-
validate = positive}
455+
validate = positive},
456+
<<"max_worker_queue_len">> => #option{type = integer,
457+
validate = non_negative}
456458
},
457459
defaults = maps:merge(#{<<"workers">> => 10,
458460
<<"strategy">> => best_worker,
@@ -544,8 +546,6 @@ outgoing_pool_connection(<<"rabbit">>) ->
544546
<<"virtual_host">> => #option{type = binary,
545547
validate = non_empty},
546548
<<"confirms_enabled">> => #option{type = boolean},
547-
<<"max_worker_queue_len">> => #option{type = int_or_infinity,
548-
validate = non_negative},
549549
<<"tls">> => tls([client])
550550
},
551551
include = always,
@@ -554,8 +554,7 @@ outgoing_pool_connection(<<"rabbit">>) ->
554554
<<"username">> => <<"guest">>,
555555
<<"password">> => <<"guest">>,
556556
<<"virtual_host">> => <<"/">>,
557-
<<"confirms_enabled">> => false,
558-
<<"max_worker_queue_len">> => 1000}
557+
<<"confirms_enabled">> => false}
559558
};
560559
outgoing_pool_connection(<<"rdbms">>) ->
561560
#section{
@@ -1048,7 +1047,7 @@ check_auth_method(Method, Opts) ->
10481047

10491048
process_pool([Tag, Type | _], AllOpts = #{scope := ScopeIn, connection := Connection}) ->
10501049
Scope = pool_scope(ScopeIn),
1051-
Opts = maps:without([scope, host, connection], AllOpts),
1050+
Opts = verify_pool_strategy(maps:without([scope, host, connection], AllOpts)),
10521051
#{type => b2a(Type),
10531052
scope => Scope,
10541053
tag => b2a(Tag),
@@ -1059,9 +1058,17 @@ process_host_config_pool([Tag, Type, _Pools, {host, HT} | _], AllOpts = #{connec
10591058
#{type => b2a(Type),
10601059
scope => HT,
10611060
tag => b2a(Tag),
1062-
opts => maps:remove(connection, AllOpts),
1061+
opts => verify_pool_strategy(maps:remove(connection, AllOpts)),
10631062
conn_opts => Connection}.
10641063

1064+
verify_pool_strategy(Opts = #{strategy := Strategy, max_worker_queue_len := _})
1065+
when Strategy =/= best_worker ->
1066+
error(#{what => invalid_worker_pool_strategy_option,
1067+
text => <<"max_worker_queue_len can only be set for the best_worker strategy">>,
1068+
pool_options => Opts});
1069+
verify_pool_strategy(Opts) ->
1070+
Opts.
1071+
10651072
pool_scope(host) -> host_type;
10661073
pool_scope(host_type) -> host_type;
10671074
pool_scope(global) -> global.

src/mongoose_rabbit_worker.erl

Lines changed: 14 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
username := binary(),
4545
password := binary(),
4646
virtual_host := binary(),
47-
confirms_enabled := boolean(),
48-
max_worker_queue_len := non_neg_integer() | infinity}.
47+
confirms_enabled := boolean()}.
4948

5049
-type publish_result() :: boolean() | timeout | {channel_exception, any(), any()}.
5150

@@ -71,20 +70,25 @@ init(State) ->
7170
{ok, establish_rabbit_connection(State)}.
7271

7372
-spec handle_call({amqp_call, mongoose_amqp:method()}, gen_server:from(), state()) ->
74-
{reply, request_dropped | {ok | error | exit | throw, mongoose_amqp:method() | atom()},
75-
state()}.
76-
handle_call(Req, From, State) ->
77-
maybe_handle_request(fun do_handle_call/3, [Req, From, State],
78-
{reply, request_dropped, State}).
73+
{reply, {ok | error | exit | throw, mongoose_amqp:method() | atom()}, state()}.
74+
handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) ->
75+
try amqp_channel:call(Channel, Method) of
76+
Res ->
77+
{reply, {ok, Res}, State}
78+
catch
79+
Error:Reason ->
80+
{reply, {Error, Reason}, maybe_restart_rabbit_connection(State)}
81+
end.
7982

8083
-spec handle_cast({amqp_publish, mongoose_amqp:method(), mongoose_amqp:message()}, state()) ->
8184
{noreply, state()}.
82-
handle_cast(Req, State) ->
83-
maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}).
85+
handle_cast({amqp_publish, Method, Payload}, State) ->
86+
handle_amqp_publish(Method, Payload, State).
8487

8588
-spec handle_info(term(), state()) -> {noreply, state()}.
8689
handle_info(Req, State) ->
87-
maybe_handle_request(fun do_handle_info/2, [Req, State], {noreply, State}).
90+
?UNEXPECTED_INFO(Req),
91+
{noreply, State}.
8892

8993
-spec terminate(term(), state()) -> ok.
9094
terminate(_Reason, #{connection := Connection, channel := Channel,
@@ -96,27 +100,6 @@ terminate(_Reason, #{connection := Connection, channel := Channel,
96100
%%% Internal functions
97101
%%%===================================================================
98102

99-
-spec do_handle_call({amqp_call, mongoose_amqp:method()}, gen_server:from(), state()) ->
100-
{reply, {ok | error | exit | throw, mongoose_amqp:method() | atom()}, state()}.
101-
do_handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) ->
102-
try amqp_channel:call(Channel, Method) of
103-
Res ->
104-
{reply, {ok, Res}, State}
105-
catch
106-
Error:Reason ->
107-
{reply, {Error, Reason}, maybe_restart_rabbit_connection(State)}
108-
end.
109-
110-
-spec do_handle_cast({amqp_publish, mongoose_amqp:method(), mongoose_amqp:message()}, state()) ->
111-
{noreply, state()}.
112-
do_handle_cast({amqp_publish, Method, Payload}, State) ->
113-
handle_amqp_publish(Method, Payload, State).
114-
115-
-spec do_handle_info(term(), state()) -> {noreply, state()}.
116-
do_handle_info(Req, State) ->
117-
?UNEXPECTED_INFO(Req),
118-
{noreply, State}.
119-
120103
-spec handle_amqp_publish(mongoose_amqp:method(), mongoose_amqp:message(), state()) ->
121104
{noreply, state()}.
122105
handle_amqp_publish(Method, Payload, State = #{host_type := HostType, pool_tag := PoolTag}) ->
@@ -229,26 +212,3 @@ maybe_enable_confirms(Channel, #{confirms_enabled := true}) ->
229212
ok;
230213
maybe_enable_confirms(_Channel, #{}) ->
231214
ok.
232-
233-
-spec maybe_handle_request(Callback :: function(), Args :: [term()], Reply :: term()) -> term().
234-
maybe_handle_request(Callback, Args, Reply) ->
235-
#{opts := #{max_worker_queue_len := Limit}} = lists:last(Args),
236-
case is_msq_queue_max_limit_reached(Limit) of
237-
false ->
238-
apply(Callback, Args);
239-
true ->
240-
?LOG_WARNING(#{what => rabbit_worker_request_dropped,
241-
reason => queue_message_length_limit_reached,
242-
limit => Limit}),
243-
Reply
244-
end.
245-
246-
-spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) -> boolean().
247-
is_msq_queue_max_limit_reached(infinity) -> false;
248-
is_msq_queue_max_limit_reached(Limit) ->
249-
case process_info(self(), message_queue_len) of
250-
{_, QueueLen} when QueueLen > Limit ->
251-
true;
252-
_Else ->
253-
false
254-
end.

src/wpool/mongoose_wpool.erl

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ get_worker(PoolType, HostType) ->
234234
get_worker(PoolType, HostType, Tag) ->
235235
case get_pool(PoolType, HostType, Tag) of
236236
{ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
237-
Worker = wpool_pool:Strategy(make_pool_name(Pool)),
237+
Worker = get_wpool_worker(make_pool_name(Pool), Strategy),
238238
{ok, whereis(Worker)};
239239
Err ->
240240
Err
@@ -381,7 +381,14 @@ expand_pools(Pools, PerHostType, HostTypes) ->
381381
prepare_pool_map(Pool = #{scope := HT, opts := Opts}) ->
382382
%% Rename "scope" field to "host_type" and change wpool opts to a KV list
383383
Pool1 = maps:remove(scope, Pool),
384-
Pool1#{host_type => HT, opts => maps:to_list(Opts)}.
384+
Pool1#{host_type => HT, opts => maps:to_list(prepare_pool_opts(Opts))}.
385+
386+
-spec prepare_pool_opts(pool_map_in()) -> pool_map_in().
387+
prepare_pool_opts(Opts = #{strategy := best_worker, max_worker_queue_len := MaxQueueLen}) ->
388+
Opts1 = maps:remove(max_worker_queue_len, Opts),
389+
Opts1#{strategy := fun(Name) -> best_worker_with_max_queue_len(Name, MaxQueueLen) end};
390+
prepare_pool_opts(Opts) ->
391+
Opts.
385392

386393
-spec get_unique_types([pool_map_in()], [pool_map_in()]) -> [pool_type()].
387394
get_unique_types(Pools, HostTypeSpecific) ->
@@ -395,6 +402,12 @@ get_pool(PoolType, HostType, Tag) ->
395402
[Pool] -> {ok, Pool}
396403
end.
397404

405+
-spec get_wpool_worker(wpool:name(), wpool:strategy()) -> proc_name().
406+
get_wpool_worker(PoolName, Strategy) when is_atom(Strategy) ->
407+
wpool_pool:Strategy(PoolName);
408+
get_wpool_worker(PoolName, StrategyFun) when is_function(StrategyFun, 1) ->
409+
StrategyFun(PoolName).
410+
398411
-spec instrumentation(pool_type(), host_type_or_global(), tag()) -> [mongoose_instrument:spec()].
399412
instrumentation(PoolType, HostType, Tag) ->
400413
CallbackModule = make_callback_module_name(PoolType),
@@ -404,3 +417,13 @@ instrumentation(PoolType, HostType, Tag) ->
404417
false ->
405418
[]
406419
end.
420+
421+
-spec best_worker_with_max_queue_len(wpool:name(), pos_integer()) -> atom().
422+
best_worker_with_max_queue_len(Name, MaxQueueLen) ->
423+
Worker = wpool_pool:best_worker(Name),
424+
case process_info(whereis(Worker), message_queue_len) of
425+
{_, QueueLen} when QueueLen >= MaxQueueLen ->
426+
exit(no_available_workers);
427+
_ ->
428+
Worker
429+
end.

test/common/config_parser_helper.erl

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,8 @@ options("outgoing_pools") ->
362362
root_dn => <<"cn=admin,dc=example,dc=com">>,
363363
servers => ["ldap-server.example.com"]}},
364364
#{type => rabbit, scope => host_type, tag => event_pusher,
365-
opts => #{workers => 20},
366-
conn_opts => #{confirms_enabled => true,
367-
max_worker_queue_len => 100}},
365+
opts => #{workers => 20, max_worker_queue_len => 100},
366+
conn_opts => #{confirms_enabled => true}},
368367
#{type => rdbms,
369368
opts => #{workers => 5},
370369
conn_opts => #{query_timeout => 5000, keepalive_interval => 30,
@@ -829,8 +828,7 @@ default_pool_conn_opts(rabbit) ->
829828
username => <<"guest">>,
830829
password => <<"guest">>,
831830
virtual_host => <<"/">>,
832-
confirms_enabled => false,
833-
max_worker_queue_len => 1000};
831+
confirms_enabled => false};
834832
default_pool_conn_opts(redis) ->
835833
#{host => "127.0.0.1",
836834
port => 6379,

test/config_parser_SUITE.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,14 +1105,12 @@ pool_rabbit_connection(_Config) ->
11051105
?cfg(P ++ [password], <<"pass">>, T(#{<<"password">> => <<"pass">>})),
11061106
?cfg(P ++ [virtual_host], <<"vh">>, T(#{<<"virtual_host">> => <<"vh">>})),
11071107
?cfg(P ++ [confirms_enabled], true, T(#{<<"confirms_enabled">> => true})),
1108-
?cfg(P ++ [max_worker_queue_len], 100, T(#{<<"max_worker_queue_len">> => 100})),
11091108
?err(T(#{<<"host">> => <<>>})),
11101109
?err(T(#{<<"port">> => 123456})),
11111110
?err(T(#{<<"username">> => <<>>})),
11121111
?err(T(#{<<"password">> => <<>>})),
11131112
?err(T(#{<<"virtual_host">> => <<>>})),
1114-
?err(T(#{<<"confirms_enabled">> => <<"yes">>})),
1115-
?err(T(#{<<"max_worker_queue_len">> => -1})).
1113+
?err(T(#{<<"confirms_enabled">> => <<"yes">>})).
11161114

11171115
pool_rabbit_connection_tls(_Config) ->
11181116
P = [outgoing_pools, 1, conn_opts, tls],
@@ -1152,9 +1150,13 @@ test_pool_opts(Type, Required) ->
11521150
?cfg(P, default_config([outgoing_pools, Type, default, opts]), T(Required)),
11531151
?cfg(P ++ [workers], 11, T(Required#{<<"workers">> => 11})),
11541152
?cfg(P ++ [strategy], random_worker, T(Required#{<<"strategy">> => <<"random_worker">>})),
1153+
?cfg(P ++ [max_worker_queue_len], 1000, T(Required#{<<"max_worker_queue_len">> => 1000})),
11551154
?cfg(P ++ [call_timeout], 999, T(Required#{<<"call_timeout">> => 999})),
11561155
?err(T(Required#{<<"workers">> => 0})),
11571156
?err(T(Required#{<<"strategy">> => <<"worst_worker">>})),
1157+
?err(T(Required#{<<"max_worker_queue_len">> => -1})),
1158+
?err(T(Required#{<<"strategy">> => <<"random_worker">>,
1159+
<<"max_worker_queue_len">> => 1000})), % this opt is only for best_worker
11581160
?err(T(Required#{<<"call_timeout">> => 0})).
11591161

11601162
test_just_tls_client(P, T) ->

test/config_parser_SUITE_data/outgoing_pools.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@
4747
[outgoing_pools.rabbit.event_pusher]
4848
scope = "host_type"
4949
workers = 20
50+
max_worker_queue_len = 100
5051

5152
[outgoing_pools.rabbit.event_pusher.connection]
5253
host = "localhost"
5354
port = 5672
5455
username = "guest"
5556
password = "guest"
5657
confirms_enabled = true
57-
max_worker_queue_len = 100
5858

5959
[outgoing_pools.ldap.default]
6060
scope = "host_type"

test/mongoose_rabbit_worker_SUITE.erl

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@
3333
all() ->
3434
[
3535
no_request_in_worker_queue_is_lost_when_amqp_call_fails,
36-
worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails,
37-
worker_processes_msgs_when_queue_msg_len_limit_is_not_reached,
38-
worker_drops_msgs_when_queue_msg_len_limit_is_reached
36+
worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails
3937
].
4038

4139
%%--------------------------------------------------------------------
@@ -125,46 +123,6 @@ worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails(Config) ->
125123
?assert(is_pid(Channel3)),
126124
?assertNotMatch(ConnectionAndChannel2, ConnectionAndChannel3).
127125

128-
129-
worker_processes_msgs_when_queue_msg_len_limit_is_not_reached(Config) ->
130-
%% given
131-
Worker = proplists:get_value(worker_pid, Config),
132-
Ref = make_ref(),
133-
Lock = lock_fun(),
134-
SendBack = send_back_fun(),
135-
136-
%% when
137-
gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
138-
gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}),
139-
[gen_server:cast(Worker, {amqp_publish, ok, ok})
140-
|| _ <- lists:seq(1, ?MAX_QUEUE_LEN-1)],
141-
142-
%% unlock the worker
143-
Worker ! Ref,
144-
145-
%% then
146-
?assertReceivedMatch(Ref, 100).
147-
148-
worker_drops_msgs_when_queue_msg_len_limit_is_reached(Config) ->
149-
%% given
150-
Worker = proplists:get_value(worker_pid, Config),
151-
Ref = make_ref(),
152-
Lock = lock_fun(),
153-
SendBack = send_back_fun(),
154-
155-
%% when
156-
gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
157-
gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}),
158-
[gen_server:cast(Worker, {amqp_publish, ok, ok})
159-
|| _ <- lists:seq(1, ?MAX_QUEUE_LEN+1)],
160-
161-
%% unlock the worker
162-
Worker ! Ref,
163-
164-
%% then
165-
?assertError({assertReceivedMatch_failed, _},
166-
?assertReceivedMatch(Ref, 100)).
167-
168126
%%--------------------------------------------------------------------
169127
%% Helpers
170128
%%--------------------------------------------------------------------

0 commit comments

Comments
 (0)