Skip to content

Commit f80ffab

Browse files
Merge branch 'operation-progress-report'
2 parents e9167f5 + f0671c2 commit f80ffab

File tree

8 files changed

+440
-104
lines changed

8 files changed

+440
-104
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/commands.rs

Lines changed: 121 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use crate::constants::DEFAULT_BLANKET_POLICY_PRIORITY;
1717
use crate::errors::CommandRunError;
18+
use crate::output::ProgressReporter;
1819
use clap::ArgMatches;
1920
use rabbitmq_http_client::blocking_api::Client;
2021
use rabbitmq_http_client::blocking_api::Result as ClientResult;
@@ -691,59 +692,67 @@ pub fn delete_federation_upstream(
691692

692693
pub fn disable_tls_peer_verification_for_all_federation_upstreams(
693694
client: APIClient,
695+
prog_rep: &mut dyn ProgressReporter,
694696
) -> Result<(), CommandRunError> {
695697
let upstreams = client.list_federation_upstreams()?;
698+
let total = upstreams.len();
699+
prog_rep.start_operation(total, "Updating federation upstream URIs");
700+
701+
for (index, upstream) in upstreams.into_iter().enumerate() {
702+
let upstream_name = &upstream.name;
703+
prog_rep.report_progress(index + 1, total, upstream_name);
696704

697-
for upstream in upstreams {
698705
let original_uri = &upstream.uri;
699706
let updated_uri = disable_tls_peer_verification(original_uri)?;
700707

701-
if original_uri != &updated_uri {
702-
let upstream_params = FederationUpstreamParams {
703-
name: &upstream.name,
704-
vhost: &upstream.vhost,
705-
uri: &updated_uri,
706-
prefetch_count: upstream
707-
.prefetch_count
708-
.unwrap_or(DEFAULT_FEDERATION_PREFETCH),
709-
reconnect_delay: upstream.reconnect_delay.unwrap_or(5),
710-
ack_mode: upstream.ack_mode,
711-
trust_user_id: upstream.trust_user_id.unwrap_or_default(),
712-
bind_using_nowait: upstream.bind_using_nowait,
713-
channel_use_mode: upstream.channel_use_mode,
714-
queue_federation: if upstream.queue.is_some() {
715-
Some(QueueFederationParams {
716-
queue: upstream.queue.as_deref(),
717-
consumer_tag: upstream.consumer_tag.as_deref(),
718-
})
719-
} else {
720-
None
721-
},
722-
exchange_federation: if upstream.exchange.is_some() {
723-
Some(ExchangeFederationParams {
724-
exchange: upstream.exchange.as_deref(),
725-
max_hops: upstream.max_hops,
726-
queue_type: upstream.queue_type.unwrap_or(QueueType::Classic),
727-
ttl: upstream.expires,
728-
message_ttl: upstream.message_ttl,
729-
resource_cleanup_mode: upstream.resource_cleanup_mode,
730-
})
731-
} else {
732-
None
733-
},
734-
};
708+
let upstream_params = FederationUpstreamParams {
709+
name: &upstream.name,
710+
vhost: &upstream.vhost,
711+
uri: &updated_uri,
712+
prefetch_count: upstream
713+
.prefetch_count
714+
.unwrap_or(DEFAULT_FEDERATION_PREFETCH),
715+
reconnect_delay: upstream.reconnect_delay.unwrap_or(5),
716+
ack_mode: upstream.ack_mode,
717+
trust_user_id: upstream.trust_user_id.unwrap_or_default(),
718+
bind_using_nowait: upstream.bind_using_nowait,
719+
channel_use_mode: upstream.channel_use_mode,
720+
queue_federation: if upstream.queue.is_some() {
721+
Some(QueueFederationParams {
722+
queue: upstream.queue.as_deref(),
723+
consumer_tag: upstream.consumer_tag.as_deref(),
724+
})
725+
} else {
726+
None
727+
},
728+
exchange_federation: if upstream.exchange.is_some() {
729+
Some(ExchangeFederationParams {
730+
exchange: upstream.exchange.as_deref(),
731+
max_hops: upstream.max_hops,
732+
queue_type: upstream.queue_type.unwrap_or(QueueType::Classic),
733+
ttl: upstream.expires,
734+
message_ttl: upstream.message_ttl,
735+
resource_cleanup_mode: upstream.resource_cleanup_mode,
736+
})
737+
} else {
738+
None
739+
},
740+
};
735741

736-
let param = RuntimeParameterDefinition::from(upstream_params);
737-
client.upsert_runtime_parameter(&param)?;
738-
}
742+
let param = RuntimeParameterDefinition::from(upstream_params);
743+
client.upsert_runtime_parameter(&param)?;
744+
prog_rep.report_success(upstream_name);
739745
}
740746

747+
prog_rep.finish_operation(total, total);
748+
741749
Ok(())
742750
}
743751

744752
pub fn enable_tls_peer_verification_for_all_federation_upstreams(
745753
client: APIClient,
746754
args: &ArgMatches,
755+
prog_rep: &mut dyn ProgressReporter,
747756
) -> Result<(), CommandRunError> {
748757
let ca_cert_path = args
749758
.get_one::<String>("node_local_ca_certificate_bundle_path")
@@ -762,8 +771,13 @@ pub fn enable_tls_peer_verification_for_all_federation_upstreams(
762771
})?;
763772

764773
let upstreams = client.list_federation_upstreams()?;
774+
let total = upstreams.len();
775+
prog_rep.start_operation(total, "Updating federation upstream URIs");
776+
777+
for (index, upstream) in upstreams.into_iter().enumerate() {
778+
let upstream_name = &upstream.name;
779+
prog_rep.report_progress(index + 1, total, upstream_name);
765780

766-
for upstream in upstreams {
767781
let original_uri = &upstream.uri;
768782
let updated_uri = enable_tls_peer_verification(
769783
original_uri,
@@ -772,86 +786,99 @@ pub fn enable_tls_peer_verification_for_all_federation_upstreams(
772786
client_key_path,
773787
)?;
774788

775-
if original_uri != &updated_uri {
776-
let upstream_params = FederationUpstreamParams {
777-
name: &upstream.name,
778-
vhost: &upstream.vhost,
779-
uri: &updated_uri,
780-
prefetch_count: upstream
781-
.prefetch_count
782-
.unwrap_or(DEFAULT_FEDERATION_PREFETCH),
783-
reconnect_delay: upstream.reconnect_delay.unwrap_or(5),
784-
ack_mode: upstream.ack_mode,
785-
trust_user_id: upstream.trust_user_id.unwrap_or_default(),
786-
bind_using_nowait: upstream.bind_using_nowait,
787-
channel_use_mode: upstream.channel_use_mode,
788-
queue_federation: if upstream.queue.is_some() {
789-
Some(QueueFederationParams {
790-
queue: upstream.queue.as_deref(),
791-
consumer_tag: upstream.consumer_tag.as_deref(),
792-
})
793-
} else {
794-
None
795-
},
796-
exchange_federation: if upstream.exchange.is_some() {
797-
Some(ExchangeFederationParams {
798-
exchange: upstream.exchange.as_deref(),
799-
max_hops: upstream.max_hops,
800-
queue_type: upstream.queue_type.unwrap_or(QueueType::Classic),
801-
ttl: upstream.expires,
802-
message_ttl: upstream.message_ttl,
803-
resource_cleanup_mode: upstream.resource_cleanup_mode,
804-
})
805-
} else {
806-
None
807-
},
808-
};
789+
let upstream_params = FederationUpstreamParams {
790+
name: &upstream.name,
791+
vhost: &upstream.vhost,
792+
uri: &updated_uri,
793+
prefetch_count: upstream
794+
.prefetch_count
795+
.unwrap_or(DEFAULT_FEDERATION_PREFETCH),
796+
reconnect_delay: upstream.reconnect_delay.unwrap_or(5),
797+
ack_mode: upstream.ack_mode,
798+
trust_user_id: upstream.trust_user_id.unwrap_or_default(),
799+
bind_using_nowait: upstream.bind_using_nowait,
800+
channel_use_mode: upstream.channel_use_mode,
801+
queue_federation: if upstream.queue.is_some() {
802+
Some(QueueFederationParams {
803+
queue: upstream.queue.as_deref(),
804+
consumer_tag: upstream.consumer_tag.as_deref(),
805+
})
806+
} else {
807+
None
808+
},
809+
exchange_federation: if upstream.exchange.is_some() {
810+
Some(ExchangeFederationParams {
811+
exchange: upstream.exchange.as_deref(),
812+
max_hops: upstream.max_hops,
813+
queue_type: upstream.queue_type.unwrap_or(QueueType::Classic),
814+
ttl: upstream.expires,
815+
message_ttl: upstream.message_ttl,
816+
resource_cleanup_mode: upstream.resource_cleanup_mode,
817+
})
818+
} else {
819+
None
820+
},
821+
};
809822

810-
let param = RuntimeParameterDefinition::from(upstream_params);
811-
client.upsert_runtime_parameter(&param)?;
812-
}
823+
let param = RuntimeParameterDefinition::from(upstream_params);
824+
client.upsert_runtime_parameter(&param)?;
825+
prog_rep.report_success(upstream_name);
813826
}
814827

828+
prog_rep.finish_operation(total, total);
815829
Ok(())
816830
}
817831

818832
pub fn disable_tls_peer_verification_for_all_source_uris(
819833
client: APIClient,
834+
prog_rep: &mut dyn ProgressReporter,
820835
) -> Result<(), CommandRunError> {
821836
let all_params = client.list_runtime_parameters()?;
822837
let shovel_params: Vec<_> = all_params
823838
.into_iter()
824839
.filter(|p| p.component == "shovel")
825840
.collect();
826841

827-
for param in shovel_params {
842+
let total = shovel_params.len();
843+
prog_rep.start_operation(total, "Updating shovel source URIs");
844+
845+
for (index, param) in shovel_params.into_iter().enumerate() {
846+
let param_name = &param.name;
847+
prog_rep.report_progress(index + 1, total, param_name);
848+
828849
let owned_params = match OwnedShovelParams::try_from(param.clone()) {
829850
Ok(params) => params,
830-
Err(_) => continue,
851+
Err(_) => {
852+
prog_rep.report_skip(param_name, "invalid shovel parameters");
853+
continue;
854+
}
831855
};
832856

833857
let original_source_uri = &owned_params.source_uri;
834858

835859
if original_source_uri.is_empty() {
860+
prog_rep.report_skip(param_name, "empty source URI");
836861
continue;
837862
}
838863

839864
let updated_source_uri = disable_tls_peer_verification(original_source_uri)?;
840865

841-
if original_source_uri != &updated_source_uri {
842-
let mut updated_params = owned_params;
843-
updated_params.source_uri = updated_source_uri;
866+
let mut updated_params = owned_params;
867+
updated_params.source_uri = updated_source_uri;
844868

845-
let param = RuntimeParameterDefinition::from(&updated_params);
846-
client.upsert_runtime_parameter(&param)?;
847-
}
869+
let param = RuntimeParameterDefinition::from(&updated_params);
870+
client.upsert_runtime_parameter(&param)?;
871+
prog_rep.report_success(param_name);
848872
}
849873

874+
prog_rep.finish_operation(total, total);
875+
850876
Ok(())
851877
}
852878

853879
pub fn disable_tls_peer_verification_for_all_destination_uris(
854880
client: APIClient,
881+
_prog_rep: &mut dyn ProgressReporter,
855882
) -> Result<(), CommandRunError> {
856883
let all_params = client.list_runtime_parameters()?;
857884
let shovel_params: Vec<_> = all_params
@@ -888,6 +915,7 @@ pub fn disable_tls_peer_verification_for_all_destination_uris(
888915
pub fn enable_tls_peer_verification_for_all_source_uris(
889916
client: APIClient,
890917
args: &ArgMatches,
918+
_prog_rep: &mut dyn ProgressReporter,
891919
) -> Result<(), CommandRunError> {
892920
let ca_cert_path = args
893921
.get_one::<String>("node_local_ca_certificate_bundle_path")
@@ -944,6 +972,7 @@ pub fn enable_tls_peer_verification_for_all_source_uris(
944972
pub fn enable_tls_peer_verification_for_all_destination_uris(
945973
client: APIClient,
946974
args: &ArgMatches,
975+
_prog_rep: &mut dyn ProgressReporter,
947976
) -> Result<(), CommandRunError> {
948977
let ca_cert_path = args
949978
.get_one::<String>("node_local_ca_certificate_bundle_path")
@@ -1486,7 +1515,7 @@ pub fn update_policy_definition(
14861515
.get_one::<String>("definition_value")
14871516
.cloned()
14881517
.unwrap();
1489-
let parsed_value = parse_json_from_arg::<serde_json::Value>(&value)?;
1518+
let parsed_value = parse_json_from_arg::<Value>(&value)?;
14901519

14911520
update_policy_definition_with(&client, vhost, &name, &key, &parsed_value).map_err(Into::into)
14921521
}
@@ -1505,7 +1534,7 @@ pub fn update_operator_policy_definition(
15051534
.get_one::<String>("definition_value")
15061535
.cloned()
15071536
.unwrap();
1508-
let parsed_value = parse_json_from_arg::<serde_json::Value>(&value)?;
1537+
let parsed_value = parse_json_from_arg::<Value>(&value)?;
15091538

15101539
update_operator_policy_definition_with(&client, vhost, &name, &key, &parsed_value)
15111540
.map_err(Into::into)
@@ -1521,7 +1550,7 @@ pub fn patch_policy_definition(
15211550
.get_one::<String>("definition")
15221551
.cloned()
15231552
.unwrap();
1524-
let parsed_value = parse_json_from_arg::<serde_json::Value>(&value)?;
1553+
let parsed_value = parse_json_from_arg::<Value>(&value)?;
15251554

15261555
let mut pol = client
15271556
.get_policy(vhost, &name)
@@ -1551,7 +1580,7 @@ pub fn update_all_policy_definitions_in(
15511580
.get_one::<String>("definition_value")
15521581
.cloned()
15531582
.unwrap();
1554-
let parsed_value = parse_json_from_arg::<serde_json::Value>(&value)?;
1583+
let parsed_value = parse_json_from_arg::<Value>(&value)?;
15551584

15561585
for pol in pols {
15571586
update_policy_definition_with(&client, vhost, &pol.name, &key, &parsed_value)?
@@ -1570,7 +1599,7 @@ pub fn patch_operator_policy_definition(
15701599
.get_one::<String>("definition")
15711600
.cloned()
15721601
.unwrap();
1573-
let parsed_value = parse_json_from_arg::<serde_json::Value>(&value)?;
1602+
let parsed_value = parse_json_from_arg::<Value>(&value)?;
15741603

15751604
let mut pol = client
15761605
.get_operator_policy(vhost, &name)
@@ -1600,7 +1629,7 @@ pub fn update_all_operator_policy_definitions_in(
16001629
.get_one::<String>("definition_value")
16011630
.cloned()
16021631
.unwrap();
1603-
let parsed_value = parse_json_from_arg::<serde_json::Value>(&value)?;
1632+
let parsed_value = parse_json_from_arg::<Value>(&value)?;
16041633

16051634
for pol in pols {
16061635
update_operator_policy_definition_with(&client, vhost, &pol.name, &key, &parsed_value)?
@@ -2192,7 +2221,7 @@ fn parse_json_from_arg<T: DeserializeOwned>(input: &str) -> Result<T, CommandRun
21922221
})
21932222
}
21942223

2195-
fn disable_tls_peer_verification(uri: &str) -> Result<String, CommandRunError> {
2224+
pub fn disable_tls_peer_verification(uri: &str) -> Result<String, CommandRunError> {
21962225
use rabbitmq_http_client::uris::UriBuilder;
21972226

21982227
let ub = UriBuilder::new(uri)
@@ -2207,7 +2236,7 @@ fn disable_tls_peer_verification(uri: &str) -> Result<String, CommandRunError> {
22072236
})
22082237
}
22092238

2210-
fn enable_tls_peer_verification(
2239+
pub fn enable_tls_peer_verification(
22112240
uri: &str,
22122241
ca_cert_path: &str,
22132242
client_cert_path: &str,

0 commit comments

Comments
 (0)