1515
1616use crate :: constants:: DEFAULT_BLANKET_POLICY_PRIORITY ;
1717use crate :: errors:: CommandRunError ;
18+ use crate :: output:: ProgressReporter ;
1819use clap:: ArgMatches ;
1920use rabbitmq_http_client:: blocking_api:: Client ;
2021use rabbitmq_http_client:: blocking_api:: Result as ClientResult ;
@@ -691,59 +692,67 @@ pub fn delete_federation_upstream(
691692
692693pub fn disable_tls_peer_verification_for_all_federation_upstreams (
693694 client : APIClient ,
695+ prog_rep : & mut dyn ProgressReporter ,
694696) -> Result < ( ) , CommandRunError > {
695697 let upstreams = client. list_federation_upstreams ( ) ?;
698+ let total = upstreams. len ( ) ;
699+ prog_rep. start_operation ( total, "Updating federation upstream URIs" ) ;
700+
701+ for ( index, upstream) in upstreams. into_iter ( ) . enumerate ( ) {
702+ let upstream_name = & upstream. name ;
703+ prog_rep. report_progress ( index + 1 , total, upstream_name) ;
696704
697- for upstream in upstreams {
698705 let original_uri = & upstream. uri ;
699706 let updated_uri = disable_tls_peer_verification ( original_uri) ?;
700707
701- if original_uri != & updated_uri {
702- let upstream_params = FederationUpstreamParams {
703- name : & upstream. name ,
704- vhost : & upstream. vhost ,
705- uri : & updated_uri,
706- prefetch_count : upstream
707- . prefetch_count
708- . unwrap_or ( DEFAULT_FEDERATION_PREFETCH ) ,
709- reconnect_delay : upstream. reconnect_delay . unwrap_or ( 5 ) ,
710- ack_mode : upstream. ack_mode ,
711- trust_user_id : upstream. trust_user_id . unwrap_or_default ( ) ,
712- bind_using_nowait : upstream. bind_using_nowait ,
713- channel_use_mode : upstream. channel_use_mode ,
714- queue_federation : if upstream. queue . is_some ( ) {
715- Some ( QueueFederationParams {
716- queue : upstream. queue . as_deref ( ) ,
717- consumer_tag : upstream. consumer_tag . as_deref ( ) ,
718- } )
719- } else {
720- None
721- } ,
722- exchange_federation : if upstream. exchange . is_some ( ) {
723- Some ( ExchangeFederationParams {
724- exchange : upstream. exchange . as_deref ( ) ,
725- max_hops : upstream. max_hops ,
726- queue_type : upstream. queue_type . unwrap_or ( QueueType :: Classic ) ,
727- ttl : upstream. expires ,
728- message_ttl : upstream. message_ttl ,
729- resource_cleanup_mode : upstream. resource_cleanup_mode ,
730- } )
731- } else {
732- None
733- } ,
734- } ;
708+ let upstream_params = FederationUpstreamParams {
709+ name : & upstream. name ,
710+ vhost : & upstream. vhost ,
711+ uri : & updated_uri,
712+ prefetch_count : upstream
713+ . prefetch_count
714+ . unwrap_or ( DEFAULT_FEDERATION_PREFETCH ) ,
715+ reconnect_delay : upstream. reconnect_delay . unwrap_or ( 5 ) ,
716+ ack_mode : upstream. ack_mode ,
717+ trust_user_id : upstream. trust_user_id . unwrap_or_default ( ) ,
718+ bind_using_nowait : upstream. bind_using_nowait ,
719+ channel_use_mode : upstream. channel_use_mode ,
720+ queue_federation : if upstream. queue . is_some ( ) {
721+ Some ( QueueFederationParams {
722+ queue : upstream. queue . as_deref ( ) ,
723+ consumer_tag : upstream. consumer_tag . as_deref ( ) ,
724+ } )
725+ } else {
726+ None
727+ } ,
728+ exchange_federation : if upstream. exchange . is_some ( ) {
729+ Some ( ExchangeFederationParams {
730+ exchange : upstream. exchange . as_deref ( ) ,
731+ max_hops : upstream. max_hops ,
732+ queue_type : upstream. queue_type . unwrap_or ( QueueType :: Classic ) ,
733+ ttl : upstream. expires ,
734+ message_ttl : upstream. message_ttl ,
735+ resource_cleanup_mode : upstream. resource_cleanup_mode ,
736+ } )
737+ } else {
738+ None
739+ } ,
740+ } ;
735741
736- let param = RuntimeParameterDefinition :: from ( upstream_params) ;
737- client. upsert_runtime_parameter ( & param) ?;
738- }
742+ let param = RuntimeParameterDefinition :: from ( upstream_params) ;
743+ client. upsert_runtime_parameter ( & param) ?;
744+ prog_rep . report_success ( upstream_name ) ;
739745 }
740746
747+ prog_rep. finish_operation ( total, total) ;
748+
741749 Ok ( ( ) )
742750}
743751
744752pub fn enable_tls_peer_verification_for_all_federation_upstreams (
745753 client : APIClient ,
746754 args : & ArgMatches ,
755+ prog_rep : & mut dyn ProgressReporter ,
747756) -> Result < ( ) , CommandRunError > {
748757 let ca_cert_path = args
749758 . get_one :: < String > ( "node_local_ca_certificate_bundle_path" )
@@ -762,8 +771,13 @@ pub fn enable_tls_peer_verification_for_all_federation_upstreams(
762771 } ) ?;
763772
764773 let upstreams = client. list_federation_upstreams ( ) ?;
774+ let total = upstreams. len ( ) ;
775+ prog_rep. start_operation ( total, "Updating federation upstream URIs" ) ;
776+
777+ for ( index, upstream) in upstreams. into_iter ( ) . enumerate ( ) {
778+ let upstream_name = & upstream. name ;
779+ prog_rep. report_progress ( index + 1 , total, upstream_name) ;
765780
766- for upstream in upstreams {
767781 let original_uri = & upstream. uri ;
768782 let updated_uri = enable_tls_peer_verification (
769783 original_uri,
@@ -772,86 +786,99 @@ pub fn enable_tls_peer_verification_for_all_federation_upstreams(
772786 client_key_path,
773787 ) ?;
774788
775- if original_uri != & updated_uri {
776- let upstream_params = FederationUpstreamParams {
777- name : & upstream. name ,
778- vhost : & upstream. vhost ,
779- uri : & updated_uri,
780- prefetch_count : upstream
781- . prefetch_count
782- . unwrap_or ( DEFAULT_FEDERATION_PREFETCH ) ,
783- reconnect_delay : upstream. reconnect_delay . unwrap_or ( 5 ) ,
784- ack_mode : upstream. ack_mode ,
785- trust_user_id : upstream. trust_user_id . unwrap_or_default ( ) ,
786- bind_using_nowait : upstream. bind_using_nowait ,
787- channel_use_mode : upstream. channel_use_mode ,
788- queue_federation : if upstream. queue . is_some ( ) {
789- Some ( QueueFederationParams {
790- queue : upstream. queue . as_deref ( ) ,
791- consumer_tag : upstream. consumer_tag . as_deref ( ) ,
792- } )
793- } else {
794- None
795- } ,
796- exchange_federation : if upstream. exchange . is_some ( ) {
797- Some ( ExchangeFederationParams {
798- exchange : upstream. exchange . as_deref ( ) ,
799- max_hops : upstream. max_hops ,
800- queue_type : upstream. queue_type . unwrap_or ( QueueType :: Classic ) ,
801- ttl : upstream. expires ,
802- message_ttl : upstream. message_ttl ,
803- resource_cleanup_mode : upstream. resource_cleanup_mode ,
804- } )
805- } else {
806- None
807- } ,
808- } ;
789+ let upstream_params = FederationUpstreamParams {
790+ name : & upstream. name ,
791+ vhost : & upstream. vhost ,
792+ uri : & updated_uri,
793+ prefetch_count : upstream
794+ . prefetch_count
795+ . unwrap_or ( DEFAULT_FEDERATION_PREFETCH ) ,
796+ reconnect_delay : upstream. reconnect_delay . unwrap_or ( 5 ) ,
797+ ack_mode : upstream. ack_mode ,
798+ trust_user_id : upstream. trust_user_id . unwrap_or_default ( ) ,
799+ bind_using_nowait : upstream. bind_using_nowait ,
800+ channel_use_mode : upstream. channel_use_mode ,
801+ queue_federation : if upstream. queue . is_some ( ) {
802+ Some ( QueueFederationParams {
803+ queue : upstream. queue . as_deref ( ) ,
804+ consumer_tag : upstream. consumer_tag . as_deref ( ) ,
805+ } )
806+ } else {
807+ None
808+ } ,
809+ exchange_federation : if upstream. exchange . is_some ( ) {
810+ Some ( ExchangeFederationParams {
811+ exchange : upstream. exchange . as_deref ( ) ,
812+ max_hops : upstream. max_hops ,
813+ queue_type : upstream. queue_type . unwrap_or ( QueueType :: Classic ) ,
814+ ttl : upstream. expires ,
815+ message_ttl : upstream. message_ttl ,
816+ resource_cleanup_mode : upstream. resource_cleanup_mode ,
817+ } )
818+ } else {
819+ None
820+ } ,
821+ } ;
809822
810- let param = RuntimeParameterDefinition :: from ( upstream_params) ;
811- client. upsert_runtime_parameter ( & param) ?;
812- }
823+ let param = RuntimeParameterDefinition :: from ( upstream_params) ;
824+ client. upsert_runtime_parameter ( & param) ?;
825+ prog_rep . report_success ( upstream_name ) ;
813826 }
814827
828+ prog_rep. finish_operation ( total, total) ;
815829 Ok ( ( ) )
816830}
817831
818832pub fn disable_tls_peer_verification_for_all_source_uris (
819833 client : APIClient ,
834+ prog_rep : & mut dyn ProgressReporter ,
820835) -> Result < ( ) , CommandRunError > {
821836 let all_params = client. list_runtime_parameters ( ) ?;
822837 let shovel_params: Vec < _ > = all_params
823838 . into_iter ( )
824839 . filter ( |p| p. component == "shovel" )
825840 . collect ( ) ;
826841
827- for param in shovel_params {
842+ let total = shovel_params. len ( ) ;
843+ prog_rep. start_operation ( total, "Updating shovel source URIs" ) ;
844+
845+ for ( index, param) in shovel_params. into_iter ( ) . enumerate ( ) {
846+ let param_name = & param. name ;
847+ prog_rep. report_progress ( index + 1 , total, param_name) ;
848+
828849 let owned_params = match OwnedShovelParams :: try_from ( param. clone ( ) ) {
829850 Ok ( params) => params,
830- Err ( _) => continue ,
851+ Err ( _) => {
852+ prog_rep. report_skip ( param_name, "invalid shovel parameters" ) ;
853+ continue ;
854+ }
831855 } ;
832856
833857 let original_source_uri = & owned_params. source_uri ;
834858
835859 if original_source_uri. is_empty ( ) {
860+ prog_rep. report_skip ( param_name, "empty source URI" ) ;
836861 continue ;
837862 }
838863
839864 let updated_source_uri = disable_tls_peer_verification ( original_source_uri) ?;
840865
841- if original_source_uri != & updated_source_uri {
842- let mut updated_params = owned_params;
843- updated_params. source_uri = updated_source_uri;
866+ let mut updated_params = owned_params;
867+ updated_params. source_uri = updated_source_uri;
844868
845- let param = RuntimeParameterDefinition :: from ( & updated_params) ;
846- client. upsert_runtime_parameter ( & param) ?;
847- }
869+ let param = RuntimeParameterDefinition :: from ( & updated_params) ;
870+ client. upsert_runtime_parameter ( & param) ?;
871+ prog_rep . report_success ( param_name ) ;
848872 }
849873
874+ prog_rep. finish_operation ( total, total) ;
875+
850876 Ok ( ( ) )
851877}
852878
853879pub fn disable_tls_peer_verification_for_all_destination_uris (
854880 client : APIClient ,
881+ _prog_rep : & mut dyn ProgressReporter ,
855882) -> Result < ( ) , CommandRunError > {
856883 let all_params = client. list_runtime_parameters ( ) ?;
857884 let shovel_params: Vec < _ > = all_params
@@ -888,6 +915,7 @@ pub fn disable_tls_peer_verification_for_all_destination_uris(
888915pub fn enable_tls_peer_verification_for_all_source_uris (
889916 client : APIClient ,
890917 args : & ArgMatches ,
918+ _prog_rep : & mut dyn ProgressReporter ,
891919) -> Result < ( ) , CommandRunError > {
892920 let ca_cert_path = args
893921 . get_one :: < String > ( "node_local_ca_certificate_bundle_path" )
@@ -944,6 +972,7 @@ pub fn enable_tls_peer_verification_for_all_source_uris(
944972pub fn enable_tls_peer_verification_for_all_destination_uris (
945973 client : APIClient ,
946974 args : & ArgMatches ,
975+ _prog_rep : & mut dyn ProgressReporter ,
947976) -> Result < ( ) , CommandRunError > {
948977 let ca_cert_path = args
949978 . get_one :: < String > ( "node_local_ca_certificate_bundle_path" )
@@ -1486,7 +1515,7 @@ pub fn update_policy_definition(
14861515 . get_one :: < String > ( "definition_value" )
14871516 . cloned ( )
14881517 . unwrap ( ) ;
1489- let parsed_value = parse_json_from_arg :: < serde_json :: Value > ( & value) ?;
1518+ let parsed_value = parse_json_from_arg :: < Value > ( & value) ?;
14901519
14911520 update_policy_definition_with ( & client, vhost, & name, & key, & parsed_value) . map_err ( Into :: into)
14921521}
@@ -1505,7 +1534,7 @@ pub fn update_operator_policy_definition(
15051534 . get_one :: < String > ( "definition_value" )
15061535 . cloned ( )
15071536 . unwrap ( ) ;
1508- let parsed_value = parse_json_from_arg :: < serde_json :: Value > ( & value) ?;
1537+ let parsed_value = parse_json_from_arg :: < Value > ( & value) ?;
15091538
15101539 update_operator_policy_definition_with ( & client, vhost, & name, & key, & parsed_value)
15111540 . map_err ( Into :: into)
@@ -1521,7 +1550,7 @@ pub fn patch_policy_definition(
15211550 . get_one :: < String > ( "definition" )
15221551 . cloned ( )
15231552 . unwrap ( ) ;
1524- let parsed_value = parse_json_from_arg :: < serde_json :: Value > ( & value) ?;
1553+ let parsed_value = parse_json_from_arg :: < Value > ( & value) ?;
15251554
15261555 let mut pol = client
15271556 . get_policy ( vhost, & name)
@@ -1551,7 +1580,7 @@ pub fn update_all_policy_definitions_in(
15511580 . get_one :: < String > ( "definition_value" )
15521581 . cloned ( )
15531582 . unwrap ( ) ;
1554- let parsed_value = parse_json_from_arg :: < serde_json :: Value > ( & value) ?;
1583+ let parsed_value = parse_json_from_arg :: < Value > ( & value) ?;
15551584
15561585 for pol in pols {
15571586 update_policy_definition_with ( & client, vhost, & pol. name , & key, & parsed_value) ?
@@ -1570,7 +1599,7 @@ pub fn patch_operator_policy_definition(
15701599 . get_one :: < String > ( "definition" )
15711600 . cloned ( )
15721601 . unwrap ( ) ;
1573- let parsed_value = parse_json_from_arg :: < serde_json :: Value > ( & value) ?;
1602+ let parsed_value = parse_json_from_arg :: < Value > ( & value) ?;
15741603
15751604 let mut pol = client
15761605 . get_operator_policy ( vhost, & name)
@@ -1600,7 +1629,7 @@ pub fn update_all_operator_policy_definitions_in(
16001629 . get_one :: < String > ( "definition_value" )
16011630 . cloned ( )
16021631 . unwrap ( ) ;
1603- let parsed_value = parse_json_from_arg :: < serde_json :: Value > ( & value) ?;
1632+ let parsed_value = parse_json_from_arg :: < Value > ( & value) ?;
16041633
16051634 for pol in pols {
16061635 update_operator_policy_definition_with ( & client, vhost, & pol. name , & key, & parsed_value) ?
@@ -2192,7 +2221,7 @@ fn parse_json_from_arg<T: DeserializeOwned>(input: &str) -> Result<T, CommandRun
21922221 } )
21932222}
21942223
2195- fn disable_tls_peer_verification ( uri : & str ) -> Result < String , CommandRunError > {
2224+ pub fn disable_tls_peer_verification ( uri : & str ) -> Result < String , CommandRunError > {
21962225 use rabbitmq_http_client:: uris:: UriBuilder ;
21972226
21982227 let ub = UriBuilder :: new ( uri)
@@ -2207,7 +2236,7 @@ fn disable_tls_peer_verification(uri: &str) -> Result<String, CommandRunError> {
22072236 } )
22082237}
22092238
2210- fn enable_tls_peer_verification (
2239+ pub fn enable_tls_peer_verification (
22112240 uri : & str ,
22122241 ca_cert_path : & str ,
22132242 client_cert_path : & str ,
0 commit comments