Skip to content

Commit f60aa47

Browse files
Introduce 'shovels enable_tls_peer_verification_for_all_{source,destination}_uris'
1 parent cd03ad8 commit f60aa47

22 files changed

+971
-507
lines changed

.config/nextest.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,18 @@ test-group = 'sequential'
5151
filter = 'test(deprecated_features)'
5252
priority = 10
5353
test-group = 'sequential'
54+
55+
[[profile.default.overrides]]
56+
filter = 'binary(federation_upstream_uri_modification_tests)'
57+
priority = 25
58+
test-group = 'sequential'
59+
60+
[[profile.default.overrides]]
61+
filter = 'binary(shovel_source_uri_modification_tests)'
62+
priority = 24
63+
test-group = 'sequential'
64+
65+
[[profile.default.overrides]]
66+
filter = 'binary(shovel_destination_uri_modification_tests)'
67+
priority = 23
68+
test-group = 'sequential'

src/cli.rs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2959,7 +2959,7 @@ pub fn get_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 1] {
29592959
)].map(|cmd| cmd.infer_long_args(pre_flight_settings.infer_long_options))
29602960
}
29612961

2962-
pub fn shovel_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 7] {
2962+
pub fn shovel_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 9] {
29632963
let idempotently_arg = Arg::new("idempotently")
29642964
.long("idempotently")
29652965
.value_parser(value_parser!(bool))
@@ -3138,6 +3138,76 @@ pub fn shovel_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 7
31383138
.after_help(color_print::cformat!(
31393139
r#"<bold>Doc guides</bold>:
31403140
3141+
* {}
3142+
* {}
3143+
* {}"#,
3144+
SHOVEL_GUIDE_URL,
3145+
TLS_GUIDE_URL,
3146+
"https://www.rabbitmq.com/docs/shovel#tls-connections"
3147+
));
3148+
3149+
let enable_tls_peer_verification_source_cmd = Command::new("enable_tls_peer_verification_for_all_source_uris")
3150+
.about("Enables TLS peer verification for all shovel source URIs with provided [RabbitMQ node-local] certificate paths.")
3151+
.long_about("Enables TLS peer verification for all shovel source URIs by updating their 'verify' parameter and adding [RabbitMQ node-local] certificate and private key file paths.")
3152+
.arg(
3153+
Arg::new("node_local_ca_certificate_bundle_path")
3154+
.long("node-local-ca-certificate-bundle-path")
3155+
.help("Path to the CA certificate bundle file on the target RabbitMQ node(s)")
3156+
.required(true)
3157+
.value_name("PATH")
3158+
)
3159+
.arg(
3160+
Arg::new("node_local_client_certificate_file_path")
3161+
.long("node-local-client-certificate-file-path")
3162+
.help("Path to the client certificate file on the target RabbitMQ node(s)")
3163+
.required(true)
3164+
.value_name("PATH")
3165+
)
3166+
.arg(
3167+
Arg::new("node_local_client_private_key_file_path")
3168+
.long("node-local-client-private-key-file-path")
3169+
.help("Path to the client private key file on the target RabbitMQ node(s)")
3170+
.required(true)
3171+
.value_name("PATH")
3172+
)
3173+
.after_help(color_print::cformat!(
3174+
r#"<bold>Doc guides</bold>:
3175+
3176+
* {}
3177+
* {}
3178+
* {}"#,
3179+
SHOVEL_GUIDE_URL,
3180+
TLS_GUIDE_URL,
3181+
"https://www.rabbitmq.com/docs/shovel#tls-connections"
3182+
));
3183+
3184+
let enable_tls_peer_verification_dest_cmd = Command::new("enable_tls_peer_verification_for_all_destination_uris")
3185+
.about("Enables TLS peer verification for all shovel destination URIs with provided [RabbitMQ node-local] certificate paths.")
3186+
.long_about("Enables TLS peer verification for all shovel destination URIs by updating their 'verify' parameter and adding [RabbitMQ node-local] certificate and private key file paths.")
3187+
.arg(
3188+
Arg::new("node_local_ca_certificate_bundle_path")
3189+
.long("node-local-ca-certificate-bundle-path")
3190+
.help("Path to the CA certificate bundle file on the target RabbitMQ node(s)")
3191+
.required(true)
3192+
.value_name("PATH")
3193+
)
3194+
.arg(
3195+
Arg::new("node_local_client_certificate_file_path")
3196+
.long("node-local-client-certificate-file-path")
3197+
.help("Path to the client certificate file on the target RabbitMQ node(s)")
3198+
.required(true)
3199+
.value_name("PATH")
3200+
)
3201+
.arg(
3202+
Arg::new("node_local_client_private_key_file_path")
3203+
.long("node-local-client-private-key-file-path")
3204+
.help("Path to the client private key file on the target RabbitMQ node(s)")
3205+
.required(true)
3206+
.value_name("PATH")
3207+
)
3208+
.after_help(color_print::cformat!(
3209+
r#"<bold>Doc guides</bold>:
3210+
31413211
* {}
31423212
* {}
31433213
* {}"#,
@@ -3154,6 +3224,8 @@ pub fn shovel_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 7
31543224
delete_cmd,
31553225
disable_tls_peer_verification_cmd,
31563226
disable_tls_peer_verification_dest_cmd,
3227+
enable_tls_peer_verification_source_cmd,
3228+
enable_tls_peer_verification_dest_cmd,
31573229
]
31583230
.map(|cmd| cmd.infer_long_args(pre_flight_settings.infer_long_options))
31593231
}

src/commands.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,118 @@ pub fn disable_tls_peer_verification_for_all_destination_uris(
885885
Ok(())
886886
}
887887

888+
pub fn enable_tls_peer_verification_for_all_source_uris(
889+
client: APIClient,
890+
args: &ArgMatches,
891+
) -> Result<(), CommandRunError> {
892+
let ca_cert_path = args
893+
.get_one::<String>("node_local_ca_certificate_bundle_path")
894+
.ok_or_else(|| CommandRunError::MissingArgumentValue {
895+
property: "node_local_ca_certificate_bundle_path".to_string(),
896+
})?;
897+
let client_cert_path = args
898+
.get_one::<String>("node_local_client_certificate_file_path")
899+
.ok_or_else(|| CommandRunError::MissingArgumentValue {
900+
property: "node_local_client_certificate_file_path".to_string(),
901+
})?;
902+
let client_key_path = args
903+
.get_one::<String>("node_local_client_private_key_file_path")
904+
.ok_or_else(|| CommandRunError::MissingArgumentValue {
905+
property: "node_local_client_private_key_file_path".to_string(),
906+
})?;
907+
908+
let all_params = client.list_runtime_parameters()?;
909+
let shovel_params: Vec<_> = all_params
910+
.into_iter()
911+
.filter(|p| p.component == "shovel")
912+
.collect();
913+
914+
for param in shovel_params {
915+
let owned_params = match OwnedShovelParams::try_from(param.clone()) {
916+
Ok(params) => params,
917+
Err(_) => continue,
918+
};
919+
920+
let original_source_uri = &owned_params.source_uri;
921+
if original_source_uri.is_empty() {
922+
continue;
923+
}
924+
925+
let updated_source_uri = enable_tls_peer_verification(
926+
original_source_uri,
927+
ca_cert_path,
928+
client_cert_path,
929+
client_key_path,
930+
)?;
931+
932+
if original_source_uri != &updated_source_uri {
933+
let mut updated_params = owned_params;
934+
updated_params.source_uri = updated_source_uri;
935+
936+
let param = RuntimeParameterDefinition::from(&updated_params);
937+
client.upsert_runtime_parameter(&param)?;
938+
}
939+
}
940+
941+
Ok(())
942+
}
943+
944+
pub fn enable_tls_peer_verification_for_all_destination_uris(
945+
client: APIClient,
946+
args: &ArgMatches,
947+
) -> Result<(), CommandRunError> {
948+
let ca_cert_path = args
949+
.get_one::<String>("node_local_ca_certificate_bundle_path")
950+
.ok_or_else(|| CommandRunError::MissingArgumentValue {
951+
property: "node_local_ca_certificate_bundle_path".to_string(),
952+
})?;
953+
let client_cert_path = args
954+
.get_one::<String>("node_local_client_certificate_file_path")
955+
.ok_or_else(|| CommandRunError::MissingArgumentValue {
956+
property: "node_local_client_certificate_file_path".to_string(),
957+
})?;
958+
let client_key_path = args
959+
.get_one::<String>("node_local_client_private_key_file_path")
960+
.ok_or_else(|| CommandRunError::MissingArgumentValue {
961+
property: "node_local_client_private_key_file_path".to_string(),
962+
})?;
963+
964+
let all_params = client.list_runtime_parameters()?;
965+
let shovel_params: Vec<_> = all_params
966+
.into_iter()
967+
.filter(|p| p.component == "shovel")
968+
.collect();
969+
970+
for param in shovel_params {
971+
let owned_params = match OwnedShovelParams::try_from(param.clone()) {
972+
Ok(params) => params,
973+
Err(_) => continue,
974+
};
975+
976+
let original_destination_uri = &owned_params.destination_uri;
977+
if original_destination_uri.is_empty() {
978+
continue;
979+
}
980+
981+
let updated_destination_uri = enable_tls_peer_verification(
982+
original_destination_uri,
983+
ca_cert_path,
984+
client_cert_path,
985+
client_key_path,
986+
)?;
987+
988+
if original_destination_uri != &updated_destination_uri {
989+
let mut updated_params = owned_params;
990+
updated_params.destination_uri = updated_destination_uri;
991+
992+
let param = RuntimeParameterDefinition::from(&updated_params);
993+
client.upsert_runtime_parameter(&param)?;
994+
}
995+
}
996+
997+
Ok(())
998+
}
999+
8881000
//
8891001
// Feature flags
8901002
//

src/main.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,20 @@ fn dispatch_common_subcommand(
10861086
let result = commands::disable_tls_peer_verification_for_all_destination_uris(client);
10871087
res_handler.no_output_on_success(result);
10881088
}
1089+
("shovels", "enable_tls_peer_verification_for_all_source_uris") => {
1090+
let result = commands::enable_tls_peer_verification_for_all_source_uris(
1091+
client,
1092+
second_level_args,
1093+
);
1094+
res_handler.no_output_on_success(result);
1095+
}
1096+
("shovels", "enable_tls_peer_verification_for_all_destination_uris") => {
1097+
let result = commands::enable_tls_peer_verification_for_all_destination_uris(
1098+
client,
1099+
second_level_args,
1100+
);
1101+
res_handler.no_output_on_success(result);
1102+
}
10891103
("streams", "declare") => {
10901104
let result = commands::declare_stream(client, &vhost, second_level_args);
10911105
res_handler.no_output_on_success(result);

tests/bindings_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ fn test_bindings_list() -> Result<(), Box<dyn std::error::Error>> {
202202

203203
#[test]
204204
fn test_bindings_delete_idempotently() -> Result<(), Box<dyn std::error::Error>> {
205-
let vh = "bindings.delete.idempotently.1";
205+
let vh = "rabbitmqadmin.bindings.test1";
206206
let source_ex = "test_source_exchange";
207207
let dest_queue = "test_dest_queue";
208208
let routing_key = "test.routing.key";

tests/combined_integration_tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use test_helpers::{run_fails, run_succeeds};
2020

2121
#[test]
2222
fn combined_integration_test1() -> Result<(), Box<dyn std::error::Error>> {
23-
let vh = "combined_integration_test1";
23+
let vh = "rabbitmqadmin.combined_integration.test1";
2424
let config_path = path::absolute("./tests/fixtures/config_files/config_file1.conf")
2525
.expect("failed to compute an absolute version for a ./test/fixtures path");
2626

@@ -45,7 +45,7 @@ fn combined_integration_test1() -> Result<(), Box<dyn std::error::Error>> {
4545

4646
#[test]
4747
fn combined_integration_test2() -> Result<(), Box<dyn std::error::Error>> {
48-
let vh = "combined_integration_test2";
48+
let vh = "rabbitmqadmin.combined_integration.test2";
4949

5050
// Uses a node alias that does not exist in the file
5151
let config_path = path::absolute("tests/fixtures/config_files/config_file1.conf")
@@ -77,7 +77,7 @@ fn combined_integration_test2() -> Result<(), Box<dyn std::error::Error>> {
7777

7878
#[test]
7979
fn combined_integration_test3() -> Result<(), Box<dyn std::error::Error>> {
80-
let vh = "combined_integration_test3";
80+
let vh = "rabbitmqadmin.combined_integration.test3";
8181

8282
// Uses a node alias that does not exist in the file
8383
let config_path = path::absolute("tests/fixtures/config_files/non_exis7ent_c0nfig_f1le.conf")
@@ -99,7 +99,7 @@ fn combined_integration_test3() -> Result<(), Box<dyn std::error::Error>> {
9999
fn combined_integration_test4() -> Result<(), Box<dyn std::error::Error>> {
100100
// This test uses administrative credentials to create a new user
101101
// and set up a topology using those new credentials
102-
let vh = "combined_integration_test4";
102+
let vh = "rabbitmqadmin.combined_integration.test4";
103103
let new_user = "user_from_combined_integration_test4";
104104
let new_pass = "p4$$w0rd_from_combined_integration_test4";
105105
let x = "fanout_combined_integration_test4";

tests/definitions_export_tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn test_export_cluster_wide_definitions() -> Result<(), Box<dyn std::error::Erro
2727

2828
#[test]
2929
fn test_export_vhost_definitions() -> Result<(), Box<dyn std::error::Error>> {
30-
let vh = "test_export_vhost_definitions.1";
30+
let vh = "rabbitmqadmin.definitions_export.test1";
3131
delete_vhost(vh).expect("failed to delete a virtual host");
3232
run_succeeds(["declare", "vhost", "--name", vh]);
3333

@@ -49,7 +49,7 @@ fn test_export_vhost_definitions() -> Result<(), Box<dyn std::error::Error>> {
4949
#[test]
5050
fn test_export_cluster_wide_definitions_with_transformations_case1()
5151
-> Result<(), Box<dyn std::error::Error>> {
52-
let vh = "test_export_cluster_definitions.transformations.1";
52+
let vh = "rabbitmqadmin.definitions_export.test2";
5353
delete_vhost(vh).expect("failed to delete a virtual host");
5454
run_succeeds(["declare", "vhost", "--name", vh]);
5555

@@ -98,7 +98,7 @@ fn test_export_cluster_wide_definitions_with_transformations_case1()
9898
#[test]
9999
fn test_export_vhost_definitions_with_transformations_case1()
100100
-> Result<(), Box<dyn std::error::Error>> {
101-
let vh = "test_export_vhost_definitions.transformations.1";
101+
let vh = "rabbitmqadmin.definitions_export.test3";
102102
delete_vhost(vh).expect("failed to delete a virtual host");
103103
run_succeeds(["declare", "vhost", "--name", vh]);
104104

tests/definitions_import_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn test_import_cluster_definitions() -> Result<(), Box<dyn std::error::Error>> {
3535

3636
#[test]
3737
fn test_import_vhost_definitions() -> Result<(), Box<dyn std::error::Error>> {
38-
let vh = "test_import_vhost_definitions.1";
38+
let vh = "rabbitmqadmin.definitions_import.test1";
3939

4040
delete_vhost(vh).expect("failed to delete a virtual host");
4141
run_succeeds(["declare", "vhost", "--name", vh]);

0 commit comments

Comments
 (0)