Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ dispatcher() ->
{"/users/:user", rabbit_mgmt_wm_user, []},
{"/users/:user/permissions", rabbit_mgmt_wm_permissions_user, []},
{"/users/:user/topic-permissions", rabbit_mgmt_wm_topic_permissions_user, []},
{"/users/:user/queues", rabbit_mgmt_wm_user_queues, []},
{"/user-limits/:user/:name", rabbit_mgmt_wm_user_limit, []},
{"/user-limits", rabbit_mgmt_wm_user_limits, []},
{"/user-limits/:user", rabbit_mgmt_wm_user_limits, []},
Expand Down
122 changes: 122 additions & 0 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_user_queues.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_mgmt_wm_user_queues).

-export([init/2, to_json/2, content_types_provided/2, is_authorized/2,
resource_exists/2, basic/1]).
-export([variances/2]).

-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").

-define(BASIC_COLUMNS,
["vhost",
"name",
"node",
"durable",
"auto_delete",
"exclusive",
"owner_pid",
"arguments",
"type",
"pid",
"state"]).

-define(DEFAULT_SORT, ["vhost", "name"]).

%%--------------------------------------------------------------------

init(Req, _InitialState) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.

variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.

content_types_provided(ReqData, Context) ->
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.

resource_exists(ReqData, Context) ->
%% just checking that the vhost requested exists
{case rabbit_mgmt_util:all_or_one_vhost(ReqData, fun (_) -> [] end) of
vhost_not_found -> false;
_ -> true
end, ReqData, Context}.

to_json(ReqData, Context) ->
try
Basic = basic_owner_and_vhost_filtered(ReqData, Context),
Data = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT,
?BASIC_COLUMNS, ReqData,
Context, augment()),
rabbit_mgmt_util:reply(Data, ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,
Context)
end.

is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_vhost(ReqData, Context).

%%--------------------------------------------------------------------
%% Exported functions

basic(ReqData) ->
%% rabbit_nodes:list_running/1 is a potentially slow function that performs
%% a cluster wide query with a reasonably long (10s) timeout.
%% TODO: replace with faster approximate function
Running = rabbit_nodes:list_running(),
Ctx = #{running_nodes => Running},
FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end,
User = rabbit_mgmt_util:id(user, ReqData),
list_queues(ReqData, Running, FmtQ, FmtQ, User).

list_queues(ReqData, Running, FormatRunningFun, FormatDownFun, User) ->
[begin
Pid = amqqueue:get_pid(Q),
%% only queues whose leader pid is a on a non running node
%% are considered "down", all other states should be passed
%% as they are and the queue type impl will decide how to
%% emit them.
case not rabbit_amqqueue:is_local_to_node_set(Pid, Running) of
false ->
FormatRunningFun(Q);
true ->
FormatDownFun(amqqueue:set_state(Q, down))
end
end || Q <- all_queues(ReqData, User)].


%%--------------------------------------------------------------------
%% Private helpers

augment() ->
fun(Basic, ReqData) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
Stats = case rabbit_mgmt_util:columns(ReqData) of
all -> basic;
_ -> detailed
end,
rabbit_mgmt_db:augment_queues(Basic,
rabbit_mgmt_util:range_ceil(ReqData),
Stats);
true ->
Basic
end
end.

basic_owner_and_vhost_filtered(ReqData, Context) ->
rabbit_mgmt_util:filter_vhost(basic(ReqData), ReqData, Context).

all_queues(ReqData, User) ->
rabbit_mgmt_util:all_or_one_vhost(ReqData, fun (VHost) -> list_all_for_user(VHost, User) end).

list_all_for_user(VHost, User) ->
All = rabbit_amqqueue:list_all(VHost),
[Q || Q <- All,
maps:get(user, amqqueue:get_options(Q)) =:= User].
34 changes: 34 additions & 0 deletions deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ all_tests() -> [
queue_pagination_test,
queue_pagination_columns_test,
queues_pagination_permissions_test,
user_queues_test,
samples_range_test,
sorting_test,
format_output_test,
Expand Down Expand Up @@ -2948,6 +2949,39 @@ queues_pagination_permissions_test(Config) ->
http_delete(Config, "/users/non-admin", {group, '2xx'}),
passed.

user_queues_test(Config) ->
%% "alice" has no access, "bob" and "carlos" have access.
http_put(Config, "/users/alice", [{password, <<"alice">>},
{tags, <<>>}], {group, '2xx'}),
http_put(Config, "/users/bob", [{password, <<"bob">>},
{tags, <<"management">>}], {group, '2xx'}),
http_put(Config, "/users/carlos", [{password, <<"carlos">>},
{tags, <<"management">>}], {group, '2xx'}),

Perms = [{configure, <<".*">>},
{write, <<".*">>},
{read, <<".*">>}],
http_put(Config, "/permissions/%2F/bob", Perms, {group, '2xx'}),
http_put(Config, "/permissions/%2F/carlos", Perms, {group, '2xx'}),

QArgs = #{},
http_put(Config, "/queues/%2F/bobq", QArgs, "bob","bob", {group, '2xx'}),
http_put(Config, "/queues/%2F/carlosq", QArgs, "carlos","carlos", {group, '2xx'}),

http_get(Config, "/users/bob/queues", "alice", "alice", ?NOT_AUTHORISED),
http_get(Config, "/users/carlos/queues", "alice", "alice", ?NOT_AUTHORISED),
[#{name := <<"bobq">>}] = http_get(Config, "/users/bob/queues", "bob", "bob", ?OK),
[#{name := <<"carlosq">>}] = http_get(Config, "/users/carlos/queues", "bob", "bob", ?OK),
[#{name := <<"bobq">>}] = http_get(Config, "/users/bob/queues", "carlos", "carlos", ?OK),
[#{name := <<"carlosq">>}] = http_get(Config, "/users/carlos/queues", "carlos", "carlos", ?OK),

http_delete(Config, "/queues/%2F/bobq","bob","bob", {group, '2xx'}),
http_delete(Config, "/queues/%2F/carlosq","carlos","carlos", {group, '2xx'}),
http_delete(Config, "/users/alice", {group, '2xx'}),
http_delete(Config, "/users/bob", {group, '2xx'}),
http_delete(Config, "/users/carlos", {group, '2xx'}),
passed.

samples_range_test(Config) ->
{Conn, Ch} = open_connection_and_channel(Config),

Expand Down
Loading