Skip to content

Commit 4e8ad48

Browse files
Tests for 'federation disable_tls_peer_verification_for_all_upstreams'
1 parent f61c3a3 commit 4e8ad48

File tree

1 file changed

+280
-0
lines changed

1 file changed

+280
-0
lines changed
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
// Copyright (C) 2023-2025 RabbitMQ Core Team (teamrabbitmq@gmail.com)
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod test_helpers;
16+
17+
use crate::test_helpers::*;
18+
use predicates::prelude::*;
19+
20+
#[test]
21+
fn test_disable_tls_peer_verification_for_all_upstreams_basic()
22+
-> Result<(), Box<dyn std::error::Error>> {
23+
let vh = "test_disable_tls_peer_verification_for_all_upstreams_basic";
24+
let upstream_name = "test_basic_upstream";
25+
26+
delete_vhost(vh).ok();
27+
run_succeeds(["declare", "vhost", "--name", vh]);
28+
29+
let amqps_endpoint = format!("amqps://localhost:5671/{}", vh);
30+
31+
run_succeeds([
32+
"-V",
33+
vh,
34+
"federation",
35+
"declare_upstream_for_exchanges",
36+
"--name",
37+
upstream_name,
38+
"--uri",
39+
&amqps_endpoint,
40+
"--exchange-name",
41+
"x.fanout",
42+
"--queue-type",
43+
"classic",
44+
]);
45+
46+
run_succeeds(["parameters", "list_all"]).stdout(predicate::str::contains(upstream_name));
47+
48+
run_succeeds([
49+
"federation",
50+
"disable_tls_peer_verification_for_all_upstreams",
51+
]);
52+
53+
run_succeeds(["parameters", "list_all"])
54+
.stdout(predicate::str::contains(upstream_name))
55+
.stdout(predicate::str::contains("verify=verify_none"));
56+
57+
delete_vhost(vh).expect("failed to delete a virtual host");
58+
59+
Ok(())
60+
}
61+
62+
#[test]
63+
fn test_disable_tls_peer_verification_for_all_upstreams_with_existing_verify_param()
64+
-> Result<(), Box<dyn std::error::Error>> {
65+
let vh = "test_disable_tls_peer_verification_for_all_upstreams_with_existing_verify_param";
66+
let upstream_name = "test_existing_upstream";
67+
68+
delete_vhost(vh).ok();
69+
run_succeeds(["declare", "vhost", "--name", vh]);
70+
71+
let amqps_endpoint = format!("amqps://localhost:5671/{}", vh);
72+
let source_uri = format!(
73+
"{}?key1=abc&verify=verify_peer&cacertfile=/path/to/ca_bundle.pem&key2=def&certfile=/path/to/client.pem&keyfile=/path/to/client.key&server_name_indication=example.com&custom_param=value123&another_param=xyz&heartbeat=60",
74+
amqps_endpoint
75+
);
76+
77+
run_succeeds([
78+
"-V",
79+
vh,
80+
"federation",
81+
"declare_upstream_for_exchanges",
82+
"--name",
83+
upstream_name,
84+
"--uri",
85+
&source_uri,
86+
"--exchange-name",
87+
"x.fanout",
88+
"--queue-type",
89+
"classic",
90+
]);
91+
await_metric_emission(500);
92+
93+
run_succeeds([
94+
"federation",
95+
"disable_tls_peer_verification_for_all_upstreams",
96+
]);
97+
98+
run_succeeds(["parameters", "list_all"])
99+
.stdout(predicate::str::contains(upstream_name))
100+
.stdout(predicate::str::contains("verify=verify_none"))
101+
.stdout(predicate::str::contains("key1=abc"))
102+
.stdout(predicate::str::contains("key2=def"))
103+
.stdout(predicate::str::contains(
104+
"cacertfile=/path/to/ca_bundle.pem",
105+
))
106+
.stdout(predicate::str::contains("certfile=/path/to/client.pem"))
107+
.stdout(predicate::str::contains("keyfile=/path/to/client.key"))
108+
.stdout(predicate::str::contains(
109+
"server_name_indication=example.com",
110+
))
111+
.stdout(predicate::str::contains("custom_param=value123"))
112+
.stdout(predicate::str::contains("another_param=xyz"))
113+
.stdout(predicate::str::contains("heartbeat=60"));
114+
115+
delete_vhost(vh).expect("failed to delete a virtual host");
116+
117+
Ok(())
118+
}
119+
120+
#[test]
121+
fn test_disable_tls_peer_verification_for_all_upstreams_queue_federation_basic()
122+
-> Result<(), Box<dyn std::error::Error>> {
123+
let vh = "test_disable_tls_peer_verification_for_all_upstreams_queue_federation_basic";
124+
let upstream_name = "test_queue_upstream";
125+
126+
delete_vhost(vh).ok();
127+
run_succeeds(["declare", "vhost", "--name", vh]);
128+
129+
let amqps_endpoint = format!("amqps://localhost:5671/{}", vh);
130+
131+
run_succeeds([
132+
"-V",
133+
vh,
134+
"federation",
135+
"declare_upstream_for_queues",
136+
"--name",
137+
upstream_name,
138+
"--uri",
139+
&amqps_endpoint,
140+
"--queue-name",
141+
"test.queue",
142+
"--consumer-tag",
143+
"test-consumer",
144+
]);
145+
146+
run_succeeds(["parameters", "list_all"]).stdout(predicate::str::contains(upstream_name));
147+
148+
run_succeeds([
149+
"federation",
150+
"disable_tls_peer_verification_for_all_upstreams",
151+
]);
152+
153+
run_succeeds(["parameters", "list_all"])
154+
.stdout(predicate::str::contains(upstream_name))
155+
.stdout(predicate::str::contains("verify=verify_none"));
156+
157+
delete_vhost(vh).expect("failed to delete a virtual host");
158+
159+
Ok(())
160+
}
161+
162+
#[test]
163+
fn test_disable_tls_peer_verification_for_all_upstreams_queue_federation_with_params()
164+
-> Result<(), Box<dyn std::error::Error>> {
165+
let vh = "test_disable_tls_peer_verification_for_all_upstreams_queue_federation_with_params";
166+
let upstream_name = "test_queue_upstream_with_params";
167+
168+
delete_vhost(vh).ok();
169+
run_succeeds(["declare", "vhost", "--name", vh]);
170+
171+
let amqps_endpoint = format!("amqps://localhost:5671/{}", vh);
172+
let source_uri = format!(
173+
"{}?queue_param=test123&verify=verify_peer&cacertfile=/etc/ssl/certs/ca.pem&consumer_tag_param=custom&prefetch=100&ack_mode=on-confirm",
174+
amqps_endpoint
175+
);
176+
177+
run_succeeds([
178+
"-V",
179+
vh,
180+
"federation",
181+
"declare_upstream_for_queues",
182+
"--name",
183+
upstream_name,
184+
"--uri",
185+
&source_uri,
186+
"--queue-name",
187+
"federated.queue",
188+
"--consumer-tag",
189+
"queue-consumer",
190+
]);
191+
await_metric_emission(500);
192+
193+
run_succeeds([
194+
"federation",
195+
"disable_tls_peer_verification_for_all_upstreams",
196+
]);
197+
198+
run_succeeds(["parameters", "list_all"])
199+
.stdout(predicate::str::contains(upstream_name))
200+
.stdout(predicate::str::contains("verify=verify_none"))
201+
.stdout(predicate::str::contains("queue_param=test123"))
202+
.stdout(predicate::str::contains("cacertfile=/etc/ssl/certs/ca.pem"))
203+
.stdout(predicate::str::contains("consumer_tag_param=custom"))
204+
.stdout(predicate::str::contains("prefetch=100"))
205+
.stdout(predicate::str::contains("ack_mode=on-confirm"));
206+
207+
delete_vhost(vh).expect("failed to delete a virtual host");
208+
209+
Ok(())
210+
}
211+
212+
#[test]
213+
fn test_disable_tls_peer_verification_for_all_upstreams_mixed_federation()
214+
-> Result<(), Box<dyn std::error::Error>> {
215+
let vh = "test_disable_tls_peer_verification_for_all_upstreams_mixed_federation";
216+
let exchange_upstream_name = "exchange_upstream";
217+
let queue_upstream_name = "queue_upstream";
218+
219+
delete_vhost(vh).ok();
220+
run_succeeds(["declare", "vhost", "--name", vh]);
221+
222+
let amqps_endpoint = format!("amqps://localhost:5671/{}", vh);
223+
let exchange_uri = format!(
224+
"{}?exchange_param=value1&verify=verify_peer&certfile=/path/to/client.pem",
225+
amqps_endpoint
226+
);
227+
let queue_uri = format!(
228+
"{}?queue_param=value2&verify=verify_peer&keyfile=/path/to/client.key",
229+
amqps_endpoint
230+
);
231+
232+
run_succeeds([
233+
"-V",
234+
vh,
235+
"federation",
236+
"declare_upstream_for_exchanges",
237+
"--name",
238+
exchange_upstream_name,
239+
"--uri",
240+
&exchange_uri,
241+
"--exchange-name",
242+
"x.federated",
243+
"--queue-type",
244+
"classic",
245+
]);
246+
247+
run_succeeds([
248+
"-V",
249+
vh,
250+
"federation",
251+
"declare_upstream_for_queues",
252+
"--name",
253+
queue_upstream_name,
254+
"--uri",
255+
&queue_uri,
256+
"--queue-name",
257+
"q.federated",
258+
"--consumer-tag",
259+
"mixed-consumer",
260+
]);
261+
await_metric_emission(500);
262+
263+
run_succeeds([
264+
"federation",
265+
"disable_tls_peer_verification_for_all_upstreams",
266+
]);
267+
268+
run_succeeds(["parameters", "list_all"])
269+
.stdout(predicate::str::contains(exchange_upstream_name))
270+
.stdout(predicate::str::contains(queue_upstream_name))
271+
.stdout(predicate::str::contains("exchange_param=value1"))
272+
.stdout(predicate::str::contains("queue_param=value2"))
273+
.stdout(predicate::str::contains("certfile=/path/to/client.pem"))
274+
.stdout(predicate::str::contains("keyfile=/path/to/client.key"))
275+
.stdout(predicate::str::contains("verify=verify_none"));
276+
277+
delete_vhost(vh).expect("failed to delete a virtual host");
278+
279+
Ok(())
280+
}

0 commit comments

Comments
 (0)