From 1b0322028b93ba423218fbdf05b7155df14dc52e Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Tue, 10 Feb 2026 19:43:23 +0200 Subject: [PATCH 1/6] `clean-data` option cleanup: put the name of the option into a constant; avoid showing mode selection dialog in node-gui if `--clean-data` was specified; do the cleanup after logging was initialized, so that the corresponding log lines are visible. --- node-daemon/src/main.rs | 3 +- node-gui/backend/src/lib.rs | 5 +-- node-gui/src/main.rs | 72 ++++++++++++++++++++++--------------- node-lib/src/lib.rs | 5 ++- node-lib/src/options.rs | 13 +++++-- node-lib/src/runner.rs | 22 ++++++------ node-lib/tests/cli.rs | 3 +- test/src/bin/test_node.rs | 4 +-- 8 files changed, 77 insertions(+), 50 deletions(-) diff --git a/node-daemon/src/main.rs b/node-daemon/src/main.rs index 8b7f9acb67..5866323c3f 100644 --- a/node-daemon/src/main.rs +++ b/node-daemon/src/main.rs @@ -22,7 +22,8 @@ pub async fn run() -> anyhow::Result<()> { } node_lib::NodeSetupResult::DataDirCleanedUp => { logging::log::info!( - "Data directory is now clean. Please restart the node without `--clean-data` flag" + "Data directory is now clean. Please restart the node without the `--{}` flag", + node_lib::CLEAN_DATA_OPTION_LONG_NAME ); } }; diff --git a/node-gui/backend/src/lib.rs b/node-gui/backend/src/lib.rs index 5409a1b977..214b892def 100644 --- a/node-gui/backend/src/lib.rs +++ b/node-gui/backend/src/lib.rs @@ -22,8 +22,7 @@ mod chainstate_event_handler; mod p2p_event_handler; mod wallet_events; -use std::fmt::Debug; -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -257,6 +256,8 @@ fn handle_options_in_cold_wallet_mode( options: OptionsWithResolvedCommand, ) -> anyhow::Result { if options.clean_data_option_set() { + // Note: actually at this moment we can't get here, because the "clean data" option + // is checked before the cold mode can be chosen. log::warn!("Ignoring clean-data option in cold wallet mode"); } diff --git a/node-gui/src/main.rs b/node-gui/src/main.rs index fffddee4ac..f871590acb 100644 --- a/node-gui/src/main.rs +++ b/node-gui/src/main.rs @@ -18,8 +18,7 @@ mod main_window; mod widgets; -use std::convert::identity; -use std::env; +use std::{convert::identity, env}; use heck::ToUpperCamelCase as _; use iced::{ @@ -38,6 +37,7 @@ use node_gui_backend::{ node_initialize, BackendControls, BackendSender, InitNetwork, NodeInitializationOutcome, WalletMode, }; +use node_lib::CLEAN_DATA_OPTION_LONG_NAME; const COLD_WALLET_TOOLTIP_TEXT: &str = "Start the wallet in Cold mode without connecting to the network or any nodes. The Cold mode is made to run the wallet on an air-gapped machine without internet connection for storage of keys of high-value. For example, pool decommission keys."; @@ -172,27 +172,46 @@ fn title(state: &GuiState) -> String { } } +// Update the state once the wallet mode has been selected/decided upon. +fn update_on_mode_selected( + state: &mut GuiState, + wallet_mode: WalletMode, + opts: node_lib::OptionsWithResolvedCommand, +) -> Task { + *state = GuiState::Loading { + wallet_mode, + chain_type: opts.command.chain_type(), + }; + + Task::perform(node_initialize(opts, wallet_mode), Message::Loaded) +} + fn update(state: &mut GuiState, message: Message) -> Task { match state { GuiState::Initial { initial_options } => match message { - Message::FontLoaded(Ok(())) => { - match &initial_options.command { - Some(command) => { - *state = GuiState::SelectWalletMode { - resolved_options: node_lib::OptionsWithResolvedCommand { - top_level: initial_options.top_level.clone(), - command: command.clone(), - }, - }; - } - None => { - *state = GuiState::SelectNetwork { - top_level_options: initial_options.top_level.clone(), - }; + Message::FontLoaded(Ok(())) => match &initial_options.command { + Some(command) => { + let resolved_options = node_lib::OptionsWithResolvedCommand { + top_level: initial_options.top_level.clone(), + command: command.clone(), + }; + + if resolved_options.clean_data_option_set() { + // If "clean data" option was set, selecting the wallet mode makes no sense; + // since the option is ignored in the cold mode, we use the hot one. + update_on_mode_selected(state, WalletMode::Hot, resolved_options) + } else { + *state = GuiState::SelectWalletMode { resolved_options }; + Task::none() } } - Task::none() - } + None => { + *state = GuiState::SelectNetwork { + top_level_options: initial_options.top_level.clone(), + }; + Task::none() + } + }, Message::FontLoaded(Err(_)) => { *state = InitializationFailure { message: "Failed to load font".into(), @@ -252,14 +271,8 @@ fn update(state: &mut GuiState, message: Message) -> Task { GuiState::SelectWalletMode { resolved_options } => { match message { Message::InitWalletMode(mode) => { - let opts = resolved_options.clone(); - - *state = GuiState::Loading { - wallet_mode: mode, - chain_type: opts.command.chain_type(), - }; - - Task::perform(node_initialize(opts, mode), Message::Loaded) + let resolved_options = resolved_options.clone(); + update_on_mode_selected(state, mode, resolved_options) } Message::ShuttingDownFinished => { iced::window::get_latest().and_then(iced::window::close) @@ -503,8 +516,11 @@ fn view(state: &GuiState) -> Element<'_, Message> { InitializationInterruptionReason::DataDirCleanedUp => { column![ iced::widget::text("Data directory is now clean").size(header_font_size), - iced::widget::text("Please restart the node without `--clean-data` flag") - .size(text_font_size) + iced::widget::text(format!( + "Please restart the node without the `--{}` flag", + CLEAN_DATA_OPTION_LONG_NAME + )) + .size(text_font_size) ] } }; diff --git a/node-lib/src/lib.rs b/node-lib/src/lib.rs index a76c3f9217..eb74795d80 100644 --- a/node-lib/src/lib.rs +++ b/node-lib/src/lib.rs @@ -30,7 +30,10 @@ use chainstate_launcher::ChainConfig; pub use config_files::{ NodeConfigFile, NodeTypeConfigFile, RpcConfigFile, StorageBackendConfigFile, }; -pub use options::{Command, Options, OptionsWithResolvedCommand, RunOptions, TopLevelOptions}; +pub use options::{ + Command, Options, OptionsWithResolvedCommand, RunOptions, TopLevelOptions, + CLEAN_DATA_OPTION_LONG_NAME, +}; pub use runner::{setup, NodeSetupResult}; pub fn default_rpc_config(chain_config: &ChainConfig) -> RpcConfigFile { diff --git a/node-lib/src/options.rs b/node-lib/src/options.rs index 6d23d2fda8..55e3d5146a 100644 --- a/node-lib/src/options.rs +++ b/node-lib/src/options.rs @@ -82,7 +82,7 @@ pub struct OptionsWithResolvedCommand { impl OptionsWithResolvedCommand { pub fn clean_data_option_set(&self) -> bool { - self.command.run_options().clean_data.unwrap_or(false) + self.command.run_options().clean_data } pub fn log_to_file_option_set(&self) -> bool { @@ -191,11 +191,18 @@ pub struct RegtestOptions { pub chain_config: ChainConfigOptions, } +pub const CLEAN_DATA_OPTION_LONG_NAME: &str = "clean-data"; + #[derive(Args, Clone, Debug, Default)] pub struct RunOptions { /// If specified, the application will clean the data directory and exit immediately. - #[clap(long, short, action = clap::ArgAction::SetTrue)] - pub clean_data: Option, + #[clap( + long = CLEAN_DATA_OPTION_LONG_NAME, + short, + action = clap::ArgAction::SetTrue, + default_value_t = false + )] + pub clean_data: bool, /// Minimum number of connected peers to enable block production. #[clap(long, value_name = "COUNT")] diff --git a/node-lib/src/runner.rs b/node-lib/src/runner.rs index 0b5589733e..6383a24f15 100644 --- a/node-lib/src/runner.rs +++ b/node-lib/src/runner.rs @@ -246,18 +246,8 @@ pub async fn setup(options: OptionsWithResolvedCommand) -> Result Result Result<(), node_lib::Error> { let node = match setup_result { node_lib::NodeSetupResult::Node(node) => node, node_lib::NodeSetupResult::DataDirCleanedUp => { - panic!( - "Data directory is now clean. Please restart the node without `--clean-data` flag" - ); + panic!("Data dir cleanup option was passed to the test node"); } }; node.main().await; From 113a0933feeb7ec326d8d5dd64a7549871ff35bc Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Wed, 11 Feb 2026 12:18:42 +0200 Subject: [PATCH 2/6] Fix and improve spinner in GuiState::Loading --- node-gui/src/main.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/node-gui/src/main.rs b/node-gui/src/main.rs index f871590acb..fc3650439a 100644 --- a/node-gui/src/main.rs +++ b/node-gui/src/main.rs @@ -416,6 +416,7 @@ fn update(state: &mut GuiState, message: Message) -> Task { } } +#[allow(clippy::float_arithmetic)] fn view(state: &GuiState) -> Element<'_, Message> { match state { GuiState::Initial { .. } => { @@ -493,7 +494,35 @@ fn view(state: &GuiState) -> Element<'_, Message> { } GuiState::Loading { .. } => { - iced::widget::container(Spinner::new().width(Length::Fill).height(Length::Fill)).into() + // Create a spinner. By default, the spinner is simply a dot of the radius "circle_radius" + // that goes in a circle whose size is the spinner's widget size. To make it look a bit + // nicer we also add a light gray circle border around the whole thing. + let spinner_size = 40.0; + let spinner_dot_size = 3.0; + let border_width = 2.0; + let border_color = iced::Color::from_rgb8(192, 192, 192); + + let spinner = Spinner::new() + .width(spinner_size - 2.0 * border_width) + .height(spinner_size - 2.0 * border_width) + .circle_radius(spinner_dot_size); + + iced::widget::container( + iced::widget::container(spinner) + .width(spinner_size) + .height(spinner_size) + .padding(border_width) + .style(move |_| iced::widget::container::Style { + border: iced::Border { + width: border_width, + radius: (spinner_size / 2.0).into(), + color: border_color, + }, + ..Default::default() + }), + ) + .center(Length::Fill) + .into() } GuiState::Loaded { From 3306e6d7683cc277bac94014488fa8fbaac11e7e Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Tue, 10 Feb 2026 11:57:26 +0200 Subject: [PATCH 3/6] Pass general shutdown flag to subsystem init func --- Cargo.lock | 1 - blockprod/src/lib.rs | 2 +- node-lib/src/runner.rs | 2 +- p2p/src/lib.rs | 6 +- p2p/src/sync/tests/helpers/mod.rs | 2 +- p2p/test-utils/src/lib.rs | 2 +- p2p/tests/shutdown.rs | 2 +- serialization/core/Cargo.toml | 2 +- subsystem/Cargo.toml | 1 - subsystem/examples/stopwatch.rs | 11 +- subsystem/src/lib.rs | 2 +- subsystem/src/manager/manager_impl.rs | 108 +++++++++++------ subsystem/src/task.rs | 22 ++-- subsystem/tests/shutdown.rs | 111 ++++++++++++++++++ subsystem/tests/shutdown_timeout.rs | 55 --------- wallet/wallet-node-client/tests/call_tests.rs | 3 +- wallet/wallet-test-node/src/lib.rs | 2 +- 17 files changed, 215 insertions(+), 119 deletions(-) create mode 100644 subsystem/tests/shutdown.rs delete mode 100644 subsystem/tests/shutdown_timeout.rs diff --git a/Cargo.lock b/Cargo.lock index 2ff0329d74..93bff2e9dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8162,7 +8162,6 @@ dependencies = [ "cfg-if", "futures", "logging", - "oneshot", "static_assertions", "testing_logger", "thiserror 1.0.69", diff --git a/blockprod/src/lib.rs b/blockprod/src/lib.rs index f8185d2d21..073c855fc7 100644 --- a/blockprod/src/lib.rs +++ b/blockprod/src/lib.rs @@ -297,7 +297,7 @@ mod tests { subsystem::Handle::clone(&chainstate), time_getter.clone(), ); - let mempool = manager.add_custom_subsystem("mempool", |hdl| mempool_init.init(hdl)); + let mempool = manager.add_custom_subsystem("mempool", |hdl, _| mempool_init.init(hdl)); let mut p2p_config = test_p2p_config(); p2p_config.bind_addresses = vec![SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into()]; diff --git a/node-lib/src/runner.rs b/node-lib/src/runner.rs index 6383a24f15..8896d597e7 100644 --- a/node-lib/src/runner.rs +++ b/node-lib/src/runner.rs @@ -99,7 +99,7 @@ async fn initialize( subsystem::Handle::clone(&chainstate), Default::default(), ); - let mempool = manager.add_custom_subsystem("mempool", |handle| mempool_init.init(handle)); + let mempool = manager.add_custom_subsystem("mempool", |handle, _| mempool_init.init(handle)); // P2P subsystem let peerdb_storage = { diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 6538d4c853..47c16941d4 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -298,15 +298,15 @@ impl P2pInit { assert_eq!(*self.chain_config.chain_type(), ChainType::Regtest); assert!(self.p2p_config.socks5_proxy.is_none()); let transport = make_p2p_transport_unencrypted(); - manager.add_custom_subsystem(name, move |_| self.init::(transport)) + manager.add_custom_subsystem(name, move |_, _| self.init::(transport)) } else if let Some(socks5_proxy) = &self.p2p_config.socks5_proxy { type NetService = P2pNetworkingServiceSocks5Proxy; let transport = make_p2p_transport_socks5_proxy(socks5_proxy); - manager.add_custom_subsystem(name, move |_| self.init::(transport)) + manager.add_custom_subsystem(name, move |_, _| self.init::(transport)) } else { type NetService = P2pNetworkingService; let transport = make_p2p_transport(); - manager.add_custom_subsystem(name, move |_| self.init::(transport)) + manager.add_custom_subsystem(name, move |_, _| self.init::(transport)) } } } diff --git a/p2p/src/sync/tests/helpers/mod.rs b/p2p/src/sync/tests/helpers/mod.rs index a6bf61eb5d..40cb335e59 100644 --- a/p2p/src/sync/tests/helpers/mod.rs +++ b/p2p/src/sync/tests/helpers/mod.rs @@ -615,7 +615,7 @@ impl TestNodeBuilder { time_getter.clone(), ); let mempool = - manager.add_custom_subsystem("p2p-sync-test-mempool", |h| mempool_init.init(h)); + manager.add_custom_subsystem("p2p-sync-test-mempool", |h, _| mempool_init.init(h)); let manager_handle = manager.main_in_task(); diff --git a/p2p/test-utils/src/lib.rs b/p2p/test-utils/src/lib.rs index 4144f7d910..7b998cba1d 100644 --- a/p2p/test-utils/src/lib.rs +++ b/p2p/test-utils/src/lib.rs @@ -94,7 +94,7 @@ pub fn start_subsystems_generic( time_getter, ); let mempool = - manager.add_custom_subsystem("p2p-test-mempool", |handle| mempool_init.init(handle)); + manager.add_custom_subsystem("p2p-test-mempool", |handle, _| mempool_init.init(handle)); let manager_handle = manager.main_in_task_in_tracing_span(tracing_span); diff --git a/p2p/tests/shutdown.rs b/p2p/tests/shutdown.rs index 175f7ec0fe..905ef005e1 100644 --- a/p2p/tests/shutdown.rs +++ b/p2p/tests/shutdown.rs @@ -61,7 +61,7 @@ async fn shutdown_timeout() { Default::default(), ); let mempool = - manager.add_custom_subsystem("shutdown-test-mempool", |hdl| mempool_init.init(hdl)); + manager.add_custom_subsystem("shutdown-test-mempool", |hdl, _| mempool_init.init(hdl)); let peerdb_storage = PeerDbStorageImpl::new(InMemory::new()).unwrap(); let _p2p = make_p2p( diff --git a/serialization/core/Cargo.toml b/serialization/core/Cargo.toml index 19092851bf..7c1d476b31 100644 --- a/serialization/core/Cargo.toml +++ b/serialization/core/Cargo.toml @@ -8,7 +8,7 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -parity-scale-codec = { workspace = true, features = ["derive", "chain-error"] } +parity-scale-codec = { workspace = true, features = ["derive", "chain-error", "std"] } [dev-dependencies] arraytools.workspace = true diff --git a/subsystem/Cargo.toml b/subsystem/Cargo.toml index a75c404bc9..460210bbf6 100644 --- a/subsystem/Cargo.toml +++ b/subsystem/Cargo.toml @@ -16,7 +16,6 @@ utils = { path = "../utils" } async-trait.workspace = true cfg-if.workspace = true futures = { workspace = true, default-features = false, features = ["alloc"]} -oneshot.workspace = true thiserror.workspace = true tokio = { workspace = true, default-features = false, features = ["macros", "rt", "rt-multi-thread", "signal", "sync"]} tracing.workspace = true diff --git a/subsystem/examples/stopwatch.rs b/subsystem/examples/stopwatch.rs index 8e64684716..b6007b3a7b 100644 --- a/subsystem/examples/stopwatch.rs +++ b/subsystem/examples/stopwatch.rs @@ -13,12 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use logging::log; use std::{ convert::Infallible, time::{Duration, Instant}, }; +use tokio::sync::watch; + +use logging::log; +use utils::set_flag::SetFlag; + struct Stopwatch { start: Instant, tick_task: tokio::task::JoinHandle<()>, @@ -26,7 +30,10 @@ struct Stopwatch { impl Stopwatch { #[allow(clippy::unused_async)] - async fn init(handle: subsystem::SubmitOnlyHandle) -> Result { + async fn init( + handle: subsystem::SubmitOnlyHandle, + _shutdown_initiated_rx: watch::Receiver, + ) -> Result { let start = Instant::now(); let tick_task = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(500)); diff --git a/subsystem/src/lib.rs b/subsystem/src/lib.rs index 93291b8931..8def8b2f17 100644 --- a/subsystem/src/lib.rs +++ b/subsystem/src/lib.rs @@ -37,9 +37,9 @@ mod calls; mod manager; mod subsystem; mod task; -mod wrappers; pub mod error; +pub mod wrappers; pub use crate::{ calls::{blocking, CallResponse, CallResult, Handle, SubmitOnlyHandle}, diff --git a/subsystem/src/manager/manager_impl.rs b/subsystem/src/manager/manager_impl.rs index 7e748539fc..9281351c45 100644 --- a/subsystem/src/manager/manager_impl.rs +++ b/subsystem/src/manager/manager_impl.rs @@ -17,17 +17,17 @@ use std::{panic, time::Duration}; use futures::future::BoxFuture; use tokio::{ - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, watch}, task::JoinHandle, }; use logging::log; use utils::{ - const_value::ConstValue, shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span, - tokio_spawn_in_tracing_span, + const_value::ConstValue, set_flag::SetFlag, shallow_clone::ShallowClone, + tokio_spawn_in_current_tracing_span, tokio_spawn_in_tracing_span, }; -use crate::{task, Handle, ManagerConfig, SubmitOnlyHandle, Subsystem}; +use crate::{task, wrappers, Handle, ManagerConfig, SubmitOnlyHandle, Subsystem}; use super::shutdown_signal::shutdown_signal; @@ -41,11 +41,25 @@ pub struct Manager { // Manager configuration config: ConstValue, - // Used by a subsystem to notify the manager it is shutting down. This is taken as a command - // for all subsystems to shut down. Shutdown completion is detected by all senders having closed - // this channel. - shutting_down_tx: mpsc::UnboundedSender<()>, - shutting_down_rx: mpsc::UnboundedReceiver<()>, + // The channel through which the shutdown may be initiated. + // Its sender is exposed to external callers via `ShutdownTrigger` and also passed to each + // subsystem task as the "task_shut_down_tx" parameter, so that when the subsystem is shut down + // for any reason (including a panic), the general shutdown is initiated as well. + shutdown_trigger_tx: mpsc::UnboundedSender<()>, + shutdown_trigger_rx: mpsc::UnboundedReceiver<()>, + + // A watch channel (a shared flag) through which the manager can notify the actual subsystems + // that the general shutdown has been initiated (so that they can abort long-running blocking + // calls, for example). + // Note: + // 1) We can't re-use subsystem's own "task shutdown channel" (whose receiver is held + // in `SubsystemData`) for the purpose of blocking calls cancellation, because the blocking call + // may need to be cancelled before this particular subsystem's shutdown has been initiated + // (e.g. if another subsystem that is shut down earlier needs this one to unblock first). + // 2) We could technically "combine" `shutdown_trigger` and `shutdown_initiated` into one + // channel, but this would probably complicate things instead of simplifying them. + shutdown_initiated_tx: watch::Sender, + shutdown_initiated_rx: watch::Receiver, // List of subsystem tasks subsystems: Vec>>, @@ -60,13 +74,17 @@ impl Manager { /// Initialize a new subsystem manager. pub fn new_with_config(config: ManagerConfig) -> Self { log::info!("Initializing subsystem manager {}", config.name); - let (shutting_down_tx, shutting_down_rx) = mpsc::unbounded_channel(); + + let (shutdown_trigger_tx, shutdown_trigger_rx) = mpsc::unbounded_channel(); + let (shutdown_initiated_tx, shutdown_initiated_rx) = watch::channel(SetFlag::new()); let subsystems = Vec::new(); Self { config: config.into(), - shutting_down_tx, - shutting_down_rx, + shutdown_trigger_tx, + shutdown_trigger_rx, + shutdown_initiated_tx, + shutdown_initiated_rx, subsystems, } } @@ -76,15 +94,21 @@ impl Manager { /// This method allows you to set up the subsystem in a custom way using an asynchronous /// initialization routine. The routine should return the subsystem state object, which has to /// implement the [Subsystem] trait. The initialization routine is also given access to a - /// send-only handle to the subsystem itself. It can be used to register the subsystem into - /// various event handlers. + /// send-only handle to the subsystem itself, which can be used to register the subsystem into + /// various event handlers, and a shutdown flag, which can be used to cancel long-running + /// synchronous tasks if a shutdown has been initiated. pub fn add_custom_subsystem( &mut self, subsys_name: &'static str, subsys_init: IF, ) -> Handle where - IF: FnOnce(SubmitOnlyHandle) -> SF + Send + 'static, + IF: FnOnce( + SubmitOnlyHandle, + /*shutdown initiated*/ watch::Receiver, + ) -> SF + + Send + + 'static, SF: std::future::IntoFuture> + Send + 'static, SF::IntoFuture: Send, S: Subsystem, @@ -92,28 +116,32 @@ impl Manager { { let full_name = self.config.full_name_of(subsys_name); - // Shutdown-related channels - let (shutdown_tx, shutdown_rx) = oneshot::channel(); + // The channel through which the manager will initiate the shutdown of this particular + // subsystem's task. + let (task_shutdown_trigger_tx, task_shutdown_trigger_rx) = oneshot::channel(); - // Call related channels + // Action channel let (action_tx, action_rx) = mpsc::unbounded_channel(); let submit_handle = SubmitOnlyHandle::new(action_tx); log::info!("Registering subsystem {full_name}"); - let task = Box::pin(task::subsystem( - full_name.clone(), - subsys_init, + let subsys_future = subsys_init( submit_handle.shallow_clone(), + self.shutdown_initiated_rx.clone(), + ); + let task_future = Box::pin(task::subsystem( + full_name.clone(), + subsys_future, action_rx, - shutdown_rx, - self.shutting_down_tx.clone(), + task_shutdown_trigger_rx, + self.shutdown_trigger_tx.clone(), )); self.subsystems.push(SubsystemData { full_name, - task, - shutdown_tx, + task: task_future, + task_shutdown_tx: task_shutdown_trigger_tx, }); Handle::new(submit_handle) @@ -124,7 +152,7 @@ impl Manager { where S: Send + Sync + Subsystem + 'static, { - self.add_custom_subsystem(name, move |_| async { + self.add_custom_subsystem(name, move |_, _| async { Result::::Ok(subsys) }) } @@ -134,12 +162,12 @@ impl Manager { where S: Send + Sync + 'static, { - self.add_subsystem(name, crate::wrappers::Direct::new(subsys)) + self.add_subsystem(name, wrappers::Direct::new(subsys)) } /// Create a trigger object that can be used to shut down the system pub fn make_shutdown_trigger(&self) -> ShutdownTrigger { - ShutdownTrigger::new(&self.shutting_down_tx) + ShutdownTrigger::new(&self.shutdown_trigger_tx) } /// Run the application main task. @@ -150,7 +178,7 @@ impl Manager { log::info!("Manager {manager_name} starting subsystems"); // Run all the subsystem tasks. - let subsystems: Vec<_> = self + let subsystems = self .subsystems .into_iter() .map(|subsys_data| { @@ -158,17 +186,23 @@ impl Manager { tokio_spawn_in_current_tracing_span(fut, subsys_full_name) }) }) - .collect(); + .collect::>(); - // Signal the manager is shut down so it does not wait for itself - drop(self.shutting_down_tx); + // Drop the shutdown trigger sender to ensure that the manager won't wait for itself + // (e.g. if no subsystems were registered or if they somehow exited without sending + // a shutdown trigger, though the latter should not be possible at this moment). + drop(self.shutdown_trigger_tx); // Wait for the shutdown trigger. - match shutdown_signal(self.shutting_down_rx, self.config.enable_signal_handlers).await { + match shutdown_signal(self.shutdown_trigger_rx, self.config.enable_signal_handlers).await { Ok(reason) => log::info!("Manager {manager_name} shutting down: {reason}"), Err(err) => log::error!("Manager {manager_name} shutting down: {err}"), } + // Set the "shutdown initiated" flag so that subsystems that perform long-running blocking + // calls could cancel whatever they're doing. + self.shutdown_initiated_tx.send_modify(|flag| flag.set()); + // Shut down the subsystems in the reverse order of creation. for subsys in subsystems.into_iter().rev() { subsys.shutdown(self.config.shutdown_timeout_per_subsystem).await; @@ -207,7 +241,7 @@ impl Manager { /// Information about each subsystem stored by the manager struct SubsystemData { full_name: String, - shutdown_tx: oneshot::Sender<()>, + task_shutdown_tx: oneshot::Sender<()>, task: T, } @@ -215,13 +249,13 @@ impl SubsystemData { fn map_task(self, f: impl FnOnce(T, /*full_name*/ &str) -> U) -> SubsystemData { let Self { full_name, - shutdown_tx, + task_shutdown_tx, task, } = self; let task = f(task, &full_name); SubsystemData { full_name, - shutdown_tx, + task_shutdown_tx, task, } } @@ -231,7 +265,7 @@ impl SubsystemData> { async fn shutdown(self, timeout: Option) { let full_name = self.full_name; - if let Err(()) = self.shutdown_tx.send(()) { + if let Err(()) = self.task_shutdown_tx.send(()) { log::warn!("Subsystem {full_name} is already down"); } diff --git a/subsystem/src/task.rs b/subsystem/src/task.rs index 4fdcc067cd..d86b810000 100644 --- a/subsystem/src/task.rs +++ b/subsystem/src/task.rs @@ -15,6 +15,8 @@ //! Implementation of tasks that constitute the subsystem mechanism. +use std::future::IntoFuture; + use tokio::{ sync::{mpsc, oneshot, RwLock}, task::JoinSet, @@ -24,7 +26,7 @@ use tracing::Instrument; use logging::log; use utils::{once_destructor::OnceDestructor, sync::Arc, tokio_spawn_in_join_set}; -use crate::{calls::Action, SubmitOnlyHandle, Subsystem}; +use crate::{calls::Action, Subsystem}; /// Handle a task completion result pub fn handle_result(full_name: &str, task_type: &str, res: Result<(), tokio::task::JoinError>) { @@ -38,16 +40,14 @@ pub fn handle_result(full_name: &str, task_type: &str, res: Result<(), tokio::ta } /// The subsystem worker task implementation -pub async fn subsystem( +pub async fn subsystem( full_name: String, - subsys_init: IF, - submit_handle: SubmitOnlyHandle, + subsys_fut: SF, mut action_rx: mpsc::UnboundedReceiver>, - mut shutdown_rx: oneshot::Receiver<()>, - shutting_down_tx: mpsc::UnboundedSender<()>, + mut task_shutdown_trigger_rx: oneshot::Receiver<()>, + task_shut_down_tx: mpsc::UnboundedSender<()>, ) where - IF: FnOnce(SubmitOnlyHandle) -> SF + Send + 'static, - SF: std::future::IntoFuture> + Send, + SF: IntoFuture> + Send, SF::IntoFuture: Send, S: Subsystem, E: std::error::Error, @@ -58,7 +58,7 @@ pub async fn subsystem( let _shutdown_sender = OnceDestructor::new({ let full_name = &full_name; move || { - let _ = shutting_down_tx.send(()); + let _ = task_shut_down_tx.send(()); log::info!("Subsystem {full_name} terminated"); } }); @@ -67,7 +67,7 @@ pub async fn subsystem( let mut worker_tasks = JoinSet::new(); // Initialize the subsystem. - let subsys = match subsys_init(submit_handle).await { + let subsys = match subsys_fut.await { Ok(subsys) => Arc::new(RwLock::new(subsys)), Err(err) => { log::error!("Subsystem {full_name} failed to initialize: {err}"); @@ -91,7 +91,7 @@ pub async fn subsystem( biased; // We're shutting down, no point in doing anything else. - result = (&mut shutdown_rx) => { + result = (&mut task_shutdown_trigger_rx) => { if let Err(err) = result { log::error!("Shutdown channel for {full_name} closed prematurely: {err}"); } diff --git a/subsystem/tests/shutdown.rs b/subsystem/tests/shutdown.rs new file mode 100644 index 0000000000..681b55d5b4 --- /dev/null +++ b/subsystem/tests/shutdown.rs @@ -0,0 +1,111 @@ +// Copyright (c) 2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(all(feature = "time", not(loom)))] + +use std::{sync::Arc, time::Duration}; + +use subsystem::wrappers; +use tokio::sync::{watch, Mutex}; +use utils::{set_flag::SetFlag, tokio_spawn}; + +// A subsystem that blocks the shutdown process. +struct NoExit; + +#[async_trait::async_trait] +impl subsystem::Subsystem for NoExit { + type Interface = Self; + + fn interface_ref(&self) -> &Self { + self + } + + fn interface_mut(&mut self) -> &mut Self { + self + } + + async fn shutdown(self) { + std::future::pending().await + } +} + +#[tokio::test] +async fn shutdown_timeout() { + testing_logger::setup(); + + let config = subsystem::ManagerConfig::new("timeout_test") + .with_shutdown_timeout_per_subsystem(std::time::Duration::from_secs(1)); + let mut mgr = subsystem::Manager::new_with_config(config); + + mgr.add_custom_subsystem("does_not_want_to_exit", |_, _| { + std::future::ready(Result::<_, std::convert::Infallible>::Ok(NoExit)) + }); + mgr.make_shutdown_trigger().initiate(); + mgr.main().await; + + testing_logger::validate(|logs| { + assert!(logs.iter().any(|entry| entry.body.contains("shutdown timed out"))); + }); +} + +// Check that the "shutdown initiated" flag that is passed to the subsystem's init closure +// is indeed set when shutdown is initiated. +#[tokio::test] +async fn shutdown_flag_set() { + let mut mgr = subsystem::Manager::new("test"); + + let shutdown_initiated_rx_shared: Arc>>> = + Arc::new(Mutex::new(None)); + + mgr.add_custom_subsystem("test", { + let shutdown_initiated_rx_shared = Arc::clone(&shutdown_initiated_rx_shared); + move |_, shutdown_initiated_rx| async move { + *shutdown_initiated_rx_shared.lock().await = Some(shutdown_initiated_rx); + Result::<_, std::convert::Infallible>::Ok(wrappers::Direct::new(())) + } + }); + + let shutdown_trigger = mgr.make_shutdown_trigger(); + let mgr_join_handle = tokio_spawn(mgr.main(), "mgr main"); + + tokio::time::timeout(Duration::from_secs(10), async { + while shutdown_initiated_rx_shared.lock().await.is_none() { + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .unwrap(); + + let mut shutdown_initiated_rx = shutdown_initiated_rx_shared.lock().await.take().unwrap(); + + tokio::time::timeout(Duration::from_millis(500), async { + shutdown_initiated_rx.changed().await.unwrap() + }) + .await + .unwrap_err(); + + shutdown_trigger.initiate(); + + tokio::time::timeout(Duration::from_secs(10), async { + shutdown_initiated_rx.changed().await.unwrap() + }) + .await + .unwrap(); + + let flag = shutdown_initiated_rx.borrow(); + assert!(flag.test()); + + mgr_join_handle.await.unwrap(); +} diff --git a/subsystem/tests/shutdown_timeout.rs b/subsystem/tests/shutdown_timeout.rs deleted file mode 100644 index 53d1cf676f..0000000000 --- a/subsystem/tests/shutdown_timeout.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) 2023 RBB S.r.l -// opensource@mintlayer.org -// SPDX-License-Identifier: MIT -// Licensed under the MIT License; -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![cfg(all(feature = "time", not(loom)))] - -// A subsystem that blocks the shutdown process. -struct NoExit; - -#[async_trait::async_trait] -impl subsystem::Subsystem for NoExit { - type Interface = Self; - - fn interface_ref(&self) -> &Self { - self - } - - fn interface_mut(&mut self) -> &mut Self { - self - } - - async fn shutdown(self) { - std::future::pending().await - } -} - -#[tokio::test] -async fn shutdown_timeout() { - testing_logger::setup(); - - let config = subsystem::ManagerConfig::new("timeout_test") - .with_shutdown_timeout_per_subsystem(std::time::Duration::from_secs(1)); - let mut man = subsystem::Manager::new_with_config(config); - - man.add_custom_subsystem("does_not_want_to_exit", |_| { - std::future::ready(Result::<_, std::convert::Infallible>::Ok(NoExit)) - }); - man.make_shutdown_trigger().initiate(); - man.main().await; - - testing_logger::validate(|logs| { - assert!(logs.iter().any(|entry| entry.body.contains("shutdown timed out"))); - }); -} diff --git a/wallet/wallet-node-client/tests/call_tests.rs b/wallet/wallet-node-client/tests/call_tests.rs index 52adb1a371..f4ce936d58 100644 --- a/wallet/wallet-node-client/tests/call_tests.rs +++ b/wallet/wallet-node-client/tests/call_tests.rs @@ -92,7 +92,8 @@ pub async fn start_subsystems( chainstate_handle.clone(), Default::default(), ); - let mempool_handle = manager.add_custom_subsystem("test-mempool", |hdl| mempool_init.init(hdl)); + let mempool_handle = + manager.add_custom_subsystem("test-mempool", |hdl, _| mempool_init.init(hdl)); let peerdb_storage = p2p::test_helpers::peerdb_inmemory_store(); let p2p_handle = p2p::make_p2p( diff --git a/wallet/wallet-test-node/src/lib.rs b/wallet/wallet-test-node/src/lib.rs index 984e4115af..c39df02a97 100644 --- a/wallet/wallet-test-node/src/lib.rs +++ b/wallet/wallet-test-node/src/lib.rs @@ -215,7 +215,7 @@ pub async fn start_node(chain_config: Arc) -> (subsystem::Manager, Default::default(), ); let mempool = - manager.add_custom_subsystem("wallet-cli-test-mempool", |hdl| mempool_init.init(hdl)); + manager.add_custom_subsystem("wallet-cli-test-mempool", |hdl, _| mempool_init.init(hdl)); let peerdb_storage = p2p::test_helpers::peerdb_inmemory_store(); let p2p = p2p::make_p2p( From 382c9a6d7de33b4c44f976a788b0cd1504b7af8e Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 13 Feb 2026 13:03:25 +0200 Subject: [PATCH 4/6] Minor cleanup in functional tests --- test/functional/test-shell.md | 1 - .../test_framework/test_framework.py | 23 +++--- test/functional/test_framework/test_node.py | 20 +----- test/functional/test_framework/util.py | 70 +++---------------- 4 files changed, 22 insertions(+), 92 deletions(-) diff --git a/test/functional/test-shell.md b/test/functional/test-shell.md index 78737509cb..41cc859674 100644 --- a/test/functional/test-shell.md +++ b/test/functional/test-shell.md @@ -167,7 +167,6 @@ can be called after the TestShell is shut down. | Test parameter key | Default Value | Description | |---|---|---| -| `bind_to_localhost_only` | `True` | Binds bitcoind RPC services to `127.0.0.1` if set to `True`.| | `cachedir` | `"/path/to/bitcoin/test/cache"` | Sets the bitcoind datadir directory. | | `chain` | `"regtest"` | Sets the chain-type for the underlying test bitcoind processes. | | `configfile` | `"/path/to/bitcoin/test/config.ini"` | Sets the location of the test framework config file. | diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index e67e74ab72..ef5e47a311 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -91,7 +91,6 @@ def __init__(self): self.network_thread = None self.rpc_timeout = 60 # Wait for up to 60 seconds for the RPC server to respond self.supports_cli = True - self.bind_to_localhost_only = True self.parse_args() self.disable_syscall_sandbox = self.options.nosandbox or self.options.valgrind self.default_wallet_name = "default_wallet" if self.options.descriptors else "" @@ -439,6 +438,12 @@ def run_test(self): # Public helper methods. These can be accessed by the subclass test scripts. + def get_node_datadir(self, node_idx: int) -> str: + return get_datadir_path(self.options.tmpdir, node_idx) + + def init_node_datadir(self, node_idx: int): + initialize_datadir(self.options.tmpdir, node_idx) + def add_nodes(self, num_nodes: int, extra_args=None, *, rpchost=None, binary=None, binary_cli=None, versions=None): """Instantiate TestNode objects. @@ -463,10 +468,6 @@ def get_bin_from_version(version, bin_name, bin_default): bin_name, ) - if self.bind_to_localhost_only: - extra_confs = [["bind=127.0.0.1"]] * num_nodes - else: - extra_confs = [[]] * num_nodes if extra_args is None: extra_args = [[]] * num_nodes if versions is None: @@ -479,7 +480,6 @@ def get_bin_from_version(version, bin_name, bin_default): binary = [get_bin_from_version(v, 'bitcoind', self.options.bitcoind) for v in versions] if binary_cli is None: binary_cli = [get_bin_from_version(v, 'bitcoin-cli', self.options.bitcoincli) for v in versions] - assert_equal(len(extra_confs), num_nodes) assert_equal(len(extra_args), num_nodes) assert_equal(len(versions), num_nodes) assert_equal(len(binary), num_nodes) @@ -487,7 +487,7 @@ def get_bin_from_version(version, bin_name, bin_default): for i in range(num_nodes): test_node_i = TestNode( i, - get_datadir_path(self.options.tmpdir, i), + self.get_node_datadir(i), chain=self.chain, rpchost=rpchost, timewait=self.rpc_timeout, @@ -497,7 +497,6 @@ def get_bin_from_version(version, bin_name, bin_default): version=versions[i], coverage_dir=self.options.coveragedir, cwd=self.options.tmpdir, - extra_conf=extra_confs[i], extra_args=extra_args[i], use_cli=self.options.usecli, start_perf=self.options.perf, @@ -718,13 +717,12 @@ def _initialize_chain(self): if not os.path.isdir(cache_node_dir): self.log.debug("Creating cache directory {}".format(cache_node_dir)) - initialize_datadir(self.options.cachedir, CACHE_NODE_ID, self.chain, self.disable_autoconnect) + initialize_datadir(self.options.cachedir, CACHE_NODE_ID) self.nodes.append( TestNode( CACHE_NODE_ID, cache_node_dir, chain=self.chain, - extra_conf=["bind=127.0.0.1"], extra_args=[], rpchost=None, timewait=self.rpc_timeout, @@ -776,9 +774,8 @@ def cache_path(*paths): for i in range(self.num_nodes): self.log.debug("Copy cache directory {} to node {}".format(cache_node_dir, i)) - to_dir = get_datadir_path(self.options.tmpdir, i) + to_dir = self.get_node_datadir(i) shutil.copytree(cache_node_dir, to_dir) - initialize_datadir(self.options.tmpdir, i, self.chain, self.disable_autoconnect) # Overwrite port/rpcport in bitcoin.conf def _initialize_chain_clean(self): """Initialize empty blockchain for use by the test. @@ -786,7 +783,7 @@ def _initialize_chain_clean(self): Create an empty blockchain and num_nodes wallets. Useful if a test case wants complete control over initialization.""" for i in range(self.num_nodes): - initialize_datadir(self.options.tmpdir, i, self.chain, self.disable_autoconnect) + self.init_node_datadir(i) def skip_if_no_py3_zmq(self): """Attempt to import the zmq package and skip the test if the import fails.""" diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 3838c73b72..b1d12e386b 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -28,7 +28,6 @@ from .util import ( MAX_NODES, assert_equal, - append_config, delete_cookie_file, get_auth_cookie, get_rpc_proxy, @@ -67,7 +66,7 @@ class TestNode(): To make things easier for the test writer, any unrecognized messages will be dispatched to the RPC connection.""" - def __init__(self, i, datadir, *, chain, rpchost, timewait, timeout_factor, bitcoind, bitcoin_cli, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, descriptors=False): + def __init__(self, i, datadir, *, chain, rpchost, timewait, timeout_factor, bitcoind, bitcoin_cli, coverage_dir, cwd, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, descriptors=False): """ Kwargs: start_perf (bool): If True, begin profiling the node with `perf` as soon as @@ -77,7 +76,6 @@ def __init__(self, i, datadir, *, chain, rpchost, timewait, timeout_factor, bitc self.index = i self.p2p_conn_index = 1 self.datadir = datadir - self.bitcoinconf = os.path.join(self.datadir, "bitcoin.conf") self.stdout_dir = os.path.join(self.datadir, "stdout") self.stderr_dir = os.path.join(self.datadir, "stderr") self.chain = chain @@ -87,11 +85,8 @@ def __init__(self, i, datadir, *, chain, rpchost, timewait, timeout_factor, bitc self.coverage_dir = coverage_dir self.cwd = cwd self.descriptors = descriptors - if extra_conf is not None: - append_config(datadir, extra_conf) # Most callers will just need to add extra args to the standard list below. # For those callers that need more flexibility, they can just set the args property directly. - # Note that common args are set in the config file (see initialize_datadir) self.extra_args = extra_args self.version = version @@ -106,9 +101,6 @@ def __init__(self, i, datadir, *, chain, rpchost, timewait, timeout_factor, bitc # current default value is used. min_tx_relay_fee_rate = 1000 - # Configuration for logging is set as command-line args rather than in the bitcoin.conf file. - # This means that starting a bitcoind using the temp dir to debug a failed test won't - # spam debug.log. self.args = [ self.binary, f"--datadir={datadir}", @@ -117,12 +109,6 @@ def __init__(self, i, datadir, *, chain, rpchost, timewait, timeout_factor, bitc f"--p2p-bind-addresses={p2p_bind_address}", f"--max-tip-age={max_tip_age}", f"--min-tx-relay-fee-rate={min_tx_relay_fee_rate}", - #"-X", - #"-logtimemicros", - #"-debug", - #"-debugexclude=libevent", - #"-debugexclude=leveldb", - #"-uacomment=testnode%d" % i, ] if use_valgrind: default_suppressions_file = os.path.join( @@ -220,7 +206,7 @@ def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, **kwargs # Delete any existing cookie file -- if such a file exists (eg due to # unclean shutdown), it will get overwritten anyway by bitcoind, and # potentially interfere with our attempt to authenticate - delete_cookie_file(self.datadir, self.chain) + delete_cookie_file(self.datadir) # add environment variable LIBC_FATAL_STDERR_=1 so that libc errors are written to stderr and not the terminal subp_env = dict(os.environ, LIBC_FATAL_STDERR_="1") @@ -328,7 +314,7 @@ def wait_for_cookie_credentials(self): poll_per_s = 4 for _ in range(poll_per_s * self.rpc_timeout): try: - get_auth_cookie(self.datadir, self.chain) + get_auth_cookie(self.datadir) self.log.debug("Cookie credentials successfully retrieved") return except ValueError: # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index 5ea3a2e241..057401e2fd 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -341,7 +341,7 @@ def rpc_addr(n): return "127.0.0.1:" + str(rpc_port(n)) def rpc_url(datadir, i, chain, rpchost): - rpc_u, rpc_p = get_auth_cookie(datadir, chain) + rpc_u, rpc_p = get_auth_cookie(datadir) print(rpc_u) print(rpc_p) host = '127.0.0.1' @@ -359,76 +359,22 @@ def rpc_url(datadir, i, chain, rpchost): ################ -def initialize_datadir(dirname, n, chain, disable_autoconnect=True): +def initialize_datadir(dirname, n): datadir = get_datadir_path(dirname, n) if not os.path.isdir(datadir): os.makedirs(datadir) - write_config(os.path.join(datadir, "bitcoin.conf"), n=n, chain=chain, disable_autoconnect=disable_autoconnect) os.makedirs(os.path.join(datadir, 'stderr'), exist_ok=True) os.makedirs(os.path.join(datadir, 'stdout'), exist_ok=True) return datadir -def write_config(config_path, *, n, chain, extra_config="", disable_autoconnect=True): - # Translate chain subdirectory name to config name - if chain == 'testnet3': - chain_name_conf_arg = 'testnet' - chain_name_conf_section = 'test' - else: - chain_name_conf_arg = chain - chain_name_conf_section = chain - with open(config_path, 'w', encoding='utf8') as f: - if chain_name_conf_arg: - f.write("{}=1\n".format(chain_name_conf_arg)) - if chain_name_conf_section: - f.write("[{}]\n".format(chain_name_conf_section)) - f.write("port=" + str(p2p_port(n)) + "\n") - f.write("rpcport=" + str(rpc_port(n)) + "\n") - f.write("fallbackfee=0.0002\n") - f.write("server=1\n") - f.write("keypool=1\n") - f.write("discover=0\n") - f.write("dnsseed=0\n") - f.write("fixedseeds=0\n") - f.write("listenonion=0\n") - # Increase peertimeout to avoid disconnects while using mocktime. - # peertimeout is measured in mock time, so setting it large enough to - # cover any duration in mock time is sufficient. It can be overridden - # in tests. - f.write("peertimeout=999999999\n") - f.write("printtoconsole=0\n") - f.write("upnp=0\n") - f.write("natpmp=0\n") - f.write("shrinkdebugfile=0\n") - # To improve SQLite wallet performance so that the tests don't timeout, use -unsafesqlitesync - f.write("unsafesqlitesync=1\n") - if disable_autoconnect: - f.write("connect=0\n") - f.write(extra_config) - - def get_datadir_path(dirname, n): return os.path.join(dirname, "node" + str(n)) -def append_config(datadir, options): - with open(os.path.join(datadir, "bitcoin.conf"), 'a', encoding='utf8') as f: - for option in options: - f.write(option + "\n") - - -def get_auth_cookie(datadir, chain): +def get_auth_cookie(datadir): user = None password = None - if os.path.isfile(os.path.join(datadir, "bitcoin.conf")): - with open(os.path.join(datadir, "bitcoin.conf"), 'r', encoding='utf8') as f: - for line in f: - if line.startswith("rpcuser="): - assert user is None # Ensure that there is only one rpcuser line - user = line.split("=")[1].strip("\n") - if line.startswith("rpcpassword="): - assert password is None # Ensure that there is only one rpcpassword line - password = line.split("=")[1].strip("\n") try: with open(os.path.join(datadir, ".cookie"), 'r', encoding="ascii") as f: userpass = f.read() @@ -443,10 +389,12 @@ def get_auth_cookie(datadir, chain): # If a cookie file exists in the given datadir, delete it. -def delete_cookie_file(datadir, chain): - if os.path.isfile(os.path.join(datadir, chain, ".cookie")): - logger.debug("Deleting leftover cookie file") - os.remove(os.path.join(datadir, chain, ".cookie")) +def delete_cookie_file(datadir): + cookie_file = os.path.join(datadir, ".cookie") + + if os.path.isfile(cookie_file): + logger.debug(f"Deleting leftover cookie file {cookie_file}") + os.remove(cookie_file) def softfork_active(node, key): From ef4cfa0bd89d7af89b99619aad0e09579b9264f6 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 13 Feb 2026 16:33:55 +0200 Subject: [PATCH 5/6] Send NewTip event on block invalidation and resetting failure flags --- blockprod/src/detail/job_manager/mod.rs | 2 +- .../src/detail/block_invalidation/mod.rs | 14 ++ chainstate/src/detail/mod.rs | 52 ++++- .../interface/chainstate_interface_impl.rs | 3 +- chainstate/src/lib.rs | 2 +- chainstate/src/rpc/types/event.rs | 7 +- .../src/tests/block_invalidation.rs | 192 +++++++++++++++++- .../test-suite/src/tests/events_tests.rs | 9 +- .../test-suite/src/tests/helpers/mod.rs | 35 ++++ chainstate/test-suite/src/tests/mod.rs | 7 +- .../test-suite/src/tests/reorgs_tests.rs | 37 ++-- mempool/src/event.rs | 8 +- mempool/src/pool/mod.rs | 6 +- mempool/src/pool/tests/basic.rs | 10 +- mempool/src/pool/tx_pool/mod.rs | 6 +- mempool/src/pool/tx_pool/reorg.rs | 6 +- mempool/src/pool/tx_pool/tests/utils.rs | 2 +- mempool/src/rpc_event.rs | 4 +- p2p/src/peer_manager_event.rs | 8 +- p2p/src/sync/mod.rs | 8 +- p2p/src/sync/peer/block_manager.rs | 2 +- p2p/src/sync/tests/block_response.rs | 2 +- p2p/src/sync/tests/helpers/mod.rs | 6 +- 23 files changed, 354 insertions(+), 74 deletions(-) diff --git a/blockprod/src/detail/job_manager/mod.rs b/blockprod/src/detail/job_manager/mod.rs index 7d70df1869..df71be0c47 100644 --- a/blockprod/src/detail/job_manager/mod.rs +++ b/blockprod/src/detail/job_manager/mod.rs @@ -289,7 +289,7 @@ impl JobManager { height: _, is_initial_block_download: _, } => { - _ = chainstate_sender.send(block_id.into()).log_err_pfx( + _ = chainstate_sender.send(block_id).log_err_pfx( "Chainstate subscriber failed to send new tip", ); } diff --git a/chainstate/src/detail/block_invalidation/mod.rs b/chainstate/src/detail/block_invalidation/mod.rs index b759daf509..177eb46e2d 100644 --- a/chainstate/src/detail/block_invalidation/mod.rs +++ b/chainstate/src/detail/block_invalidation/mod.rs @@ -328,32 +328,46 @@ impl<'a, S: BlockchainStorage, V: TransactionVerificationStrategy> BlockInvalida pub enum BlockInvalidatorError { #[error("Block storage error: {0}")] StorageError(#[from] chainstate_storage::Error), + #[error("The block {0} is too deep to invalidate")] BlockTooDeepToInvalidate(Id), + #[error("Error manipulating best chain candidates: {0}")] BestChainCandidatesError(#[from] BestChainCandidatesError), + #[error("Error disconnecting blocks until block {disconnect_until}: {error}")] BlocksDisconnectionError { disconnect_until: Id, error: Box, }, + #[error("Error updating block status for block {0}: {1}")] BlockStatusUpdateError(Id, Box), + #[error("Generic error during reorg: {0}")] GenericReorgError(Box), + #[error("Failed to commit to the DB after {0} attempts: {1}, context: {2}")] DbCommitError(usize, chainstate_storage::Error, DbCommittingContext), #[error("Failed to obtain best block index: {0}")] BlockIndicesForBranchQueryError(PropertyQueryError), + #[error("Failed to determine if the block {0} is in mainchain: {1}")] IsBlockInMainChainQueryError(Id, PropertyQueryError), + #[error("Failed to obtain the minimum height with allowed reorgs: {0}")] MinHeightForReorgQueryError(PropertyQueryError), + #[error("Failed to obtain best block index: {0}")] BestBlockIndexQueryError(PropertyQueryError), + + #[error("Failed to obtain best block id: {0}")] + BestBlockIdQueryError(PropertyQueryError), + #[error("Failed to obtain block index for block {0}: {1}")] BlockIndexQueryError(Id, PropertyQueryError), + #[error("Error deleting index for block {0}: {1}")] DelBlockIndexError(Id, BlockError), } diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index 06466761d3..391929d986 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -270,7 +270,7 @@ impl Chainstate fn broadcast_new_tip_event( &mut self, - best_block_index: &BlockIndex, + best_block_index: GenBlockIndexRef<'_>, is_initial_block_download: bool, ) { let event = ChainstateEvent::NewTip { @@ -623,7 +623,10 @@ impl Chainstate if let Some(best_block_index) = &best_block_index_opt { self.update_initial_block_download_flag(GenBlockIndexRef::Block(best_block_index)); - self.broadcast_new_tip_event(best_block_index, self.is_initial_block_download()); + self.broadcast_new_tip_event( + GenBlockIndexRef::Block(best_block_index), + self.is_initial_block_download(), + ); let compact_target = match best_block_index.block_header().consensus_data() { ConsensusData::None => Compact::from(Uint256::ZERO), @@ -707,15 +710,60 @@ impl Chainstate #[log_error] pub fn invalidate_block(&mut self, block_id: &Id) -> Result<(), BlockInvalidatorError> { + let prev_best_block_id = self + .make_db_tx_ro()? + .get_best_block_id() + .map_err(BlockInvalidatorError::BestBlockIdQueryError)?; + let result = BlockInvalidator::new(self) .invalidate_block(block_id, block_invalidation::IsExplicit::Yes); // Note: we don't ignore the result of check_consistency even though we may already have // an error to return (if the checks are enabled but couldn't be done for some reason, // we don't want to miss this). self.check_consistency()?; + + let new_best_block_index = self + .make_db_tx_ro()? + .get_best_block_index() + .map_err(BlockInvalidatorError::BestBlockIndexQueryError)?; + + if new_best_block_index.block_id() != prev_best_block_id { + self.broadcast_new_tip_event( + new_best_block_index.as_ref(), + self.is_initial_block_download(), + ); + } + result } + #[log_error] + pub fn reset_block_failure_flags( + &mut self, + block_id: &Id, + ) -> Result<(), BlockInvalidatorError> { + let prev_best_block_id = self + .make_db_tx_ro()? + .get_best_block_id() + .map_err(BlockInvalidatorError::BestBlockIdQueryError)?; + + BlockInvalidator::new(self).reset_block_failure_flags(block_id)?; + + let new_best_block_index = self + .make_db_tx_ro()? + .get_best_block_index() + .map_err(BlockInvalidatorError::BestBlockIndexQueryError)?; + + if new_best_block_index.block_id() != prev_best_block_id { + self.broadcast_new_tip_event( + new_best_block_index.as_ref(), + self.is_initial_block_download(), + ); + } + + Ok(()) + } + #[log_error] fn create_pool_in_storage( &self, diff --git a/chainstate/src/interface/chainstate_interface_impl.rs b/chainstate/src/interface/chainstate_interface_impl.rs index aeeb944ea5..76f2e8ceed 100644 --- a/chainstate/src/interface/chainstate_interface_impl.rs +++ b/chainstate/src/interface/chainstate_interface_impl.rs @@ -23,7 +23,6 @@ use crate::{ detail::{ self, block_checking::BlockChecker, - block_invalidation::BlockInvalidator, bootstrap::{export_bootstrap_stream, import_bootstrap_stream}, calculate_median_time_past, tx_verification_strategy::TransactionVerificationStrategy, @@ -108,7 +107,7 @@ where #[tracing::instrument(skip_all, fields(id = %block_id))] fn reset_block_failure_flags(&mut self, block_id: &Id) -> Result<(), ChainstateError> { - BlockInvalidator::new(&mut self.chainstate) + self.chainstate .reset_block_failure_flags(block_id) .map_err(ChainstateError::BlockInvalidatorError) } diff --git a/chainstate/src/lib.rs b/chainstate/src/lib.rs index e65c5c3660..4c98b0610f 100644 --- a/chainstate/src/lib.rs +++ b/chainstate/src/lib.rs @@ -52,7 +52,7 @@ pub use tx_verifier; #[derive(Debug, Clone, Eq, PartialEq)] pub enum ChainstateEvent { NewTip { - id: Id, + id: Id, height: BlockHeight, is_initial_block_download: bool, }, diff --git a/chainstate/src/rpc/types/event.rs b/chainstate/src/rpc/types/event.rs index 0e860d903a..6cd9ce40c1 100644 --- a/chainstate/src/rpc/types/event.rs +++ b/chainstate/src/rpc/types/event.rs @@ -14,7 +14,7 @@ // limitations under the License. use common::{ - chain::Block, + chain::GenBlock, primitives::{BlockHeight, Id}, }; @@ -23,7 +23,10 @@ use crate::ChainstateEvent; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)] #[serde(tag = "type", content = "content")] pub enum RpcEvent { - NewTip { id: Id, height: BlockHeight }, + NewTip { + id: Id, + height: BlockHeight, + }, } impl RpcEvent { diff --git a/chainstate/test-suite/src/tests/block_invalidation.rs b/chainstate/test-suite/src/tests/block_invalidation.rs index 9e01bf89e3..7d2f02b60c 100644 --- a/chainstate/test-suite/src/tests/block_invalidation.rs +++ b/chainstate/test-suite/src/tests/block_invalidation.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use rstest::rstest; -use super::helpers::{block_creation_helpers::*, block_status_helpers::*}; use chainstate::{ - BlockError, BlockInvalidatorError, BlockSource, ChainstateError, CheckBlockError, + BlockError, BlockInvalidatorError, BlockSource, ChainstateError, ChainstateEvent, + CheckBlockError, }; use chainstate_test_framework::{storage::Builder as StorageBuilder, TestFramework}; use chainstate_types::{BlockStatus, BlockValidationStage}; @@ -28,7 +28,7 @@ use common::{ self, block::{consensus_data::PoWData, Block, ConsensusData}, }, - primitives::{BlockDistance, Id, Idable}, + primitives::{BlockDistance, BlockHeight, Id, Idable}, Uint256, }; use randomness::{CryptoRng, Rng}; @@ -39,6 +39,8 @@ use test_utils::{ }; use utils::atomics::SeqCstAtomicU64; +use super::helpers::{block_creation_helpers::*, block_status_helpers::*, EventList}; + mod storage_configs { use super::StorageBuilder; use chainstate_storage::schema; @@ -97,6 +99,8 @@ fn test_stale_chain_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder) BlockValidationStage::CheckBlockOk, ); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.invalidate_block(&a0_id).unwrap(); assert_eq!(tf.best_block_id(), m_tip_id); @@ -113,6 +117,9 @@ fn test_stale_chain_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder) &[a0_id, a1_id, a2_id], BlockValidationStage::CheckBlockOk, ); + + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); }); } @@ -138,15 +145,37 @@ fn test_basic_tip_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder) { assert_eq!(tf.best_block_id(), m1_id); assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.invalidate_block(&m1_id).unwrap(); assert_eq!(tf.best_block_id(), m0_id); assert_fully_valid_blocks(&tf, &[m0_id]); assert_invalidated_blocks_at_stage(&tf, &[m1_id], BlockValidationStage::FullyChecked); + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: m0_id.into(), + height: BlockHeight::new(1), + is_initial_block_download: false + }] + ); + tf.chainstate.reset_block_failure_flags(&m1_id).unwrap(); assert_eq!(tf.best_block_id(), m1_id); assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: m1_id.into(), + height: BlockHeight::new(2), + is_initial_block_download: false + }] + ); }); } @@ -172,15 +201,37 @@ fn test_basic_parent_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder assert_eq!(tf.best_block_id(), m1_id); assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.invalidate_block(&m0_id).unwrap(); assert_eq!(tf.best_block_id(), genesis_id); assert_invalidated_blocks_at_stage(&tf, &[m0_id], BlockValidationStage::FullyChecked); assert_blocks_with_bad_parent_at_stage(&tf, &[m1_id], BlockValidationStage::FullyChecked); + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: genesis_id.into(), + height: BlockHeight::new(0), + is_initial_block_download: false + }] + ); + tf.chainstate.reset_block_failure_flags(&m0_id).unwrap(); assert_eq!(tf.best_block_id(), m1_id); assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: m1_id.into(), + height: BlockHeight::new(2), + is_initial_block_download: false + }] + ); }); } @@ -313,6 +364,8 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { let TestChainBlockIds { m, a, b, c, d, e } = block_ids; + let events = EventList::subscribe_new(&mut tf); + { // Step 1 - invalidate m3. // This should first try to switch to the "e" chain, whose activation should fail, @@ -349,6 +402,16 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: d[2].into(), + height: BlockHeight::new(5), + is_initial_block_download: false + }] + ); } { @@ -385,6 +448,9 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); + + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); } { @@ -396,6 +462,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_in_stale_chain(&tf, &c[2..]); assert_fully_valid_blocks(&tf, &c[..2]); assert_bad_blocks_at_stage(&tf, &c[2..], BlockValidationStage::Unchecked); + assert_eq!(tf.best_block_id(), c[1]); // "d" is now invalid assert_in_stale_chain(&tf, d); @@ -422,6 +489,16 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: c[1].into(), + height: BlockHeight::new(4), + is_initial_block_download: false + }] + ); } { @@ -433,6 +510,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_in_stale_chain(&tf, &b[1..]); assert_fully_valid_blocks(&tf, &b[..1]); assert_bad_blocks_at_stage(&tf, &b[1..], BlockValidationStage::Unchecked); + assert_eq!(tf.best_block_id(), b[0]); // The entire "c" is now invalid assert_in_stale_chain(&tf, c); @@ -460,6 +538,16 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: b[0].into(), + height: BlockHeight::new(3), + is_initial_block_download: false + }] + ); } { @@ -482,6 +570,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_in_stale_chain(&tf, &b[1..]); assert_fully_valid_blocks(&tf, &b[..1]); assert_bad_blocks_at_stage(&tf, &b[1..], BlockValidationStage::Unchecked); + assert_eq!(tf.best_block_id(), b[0]); assert_in_stale_chain(&tf, c); assert_invalidated_blocks_at_stage(&tf, &c[..1], FullyChecked); @@ -504,6 +593,9 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); + + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); } { @@ -523,6 +615,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_in_stale_chain(&tf, &b[1..]); assert_fully_valid_blocks(&tf, &b[..1]); assert_bad_blocks_at_stage(&tf, &b[1..], BlockValidationStage::Unchecked); + assert_eq!(tf.best_block_id(), b[0]); assert_in_stale_chain(&tf, c); assert_invalidated_blocks_at_stage(&tf, &c[..1], FullyChecked); @@ -545,6 +638,9 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); + + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); } { @@ -553,6 +649,9 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); } + // Resubscribe after reload. + let events = EventList::subscribe_new(&mut tf); + { // Step 8 - reset the fail flags of m1 and its descendants. // Note that m1 itself hasn't been invalidated, but this should not be a problem. @@ -589,6 +688,16 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { // Check the min height for reorg and the best chain candidates. assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); // Note that now b2 and c2 are among the candidates instead of b0 and c1. + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: m[6].into(), + height: BlockHeight::new(7), + is_initial_block_download: false + }] + ); } } @@ -653,6 +762,8 @@ fn test_tip_invalidation_with_no_better_candidates(#[case] seed: Seed, #[case] s tf.block_index(&m0_id).chain_trust() ); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.invalidate_block(&m1_id).unwrap(); assert_eq!(tf.best_block_id(), m0_id); @@ -660,11 +771,31 @@ fn test_tip_invalidation_with_no_better_candidates(#[case] seed: Seed, #[case] s assert_invalidated_blocks_at_stage(&tf, &[m1_id], BlockValidationStage::FullyChecked); assert_ok_blocks_at_stage(&tf, &[a0_id], BlockValidationStage::CheckBlockOk); + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: m0_id.into(), + height: BlockHeight::new(1), + is_initial_block_download: false + }] + ); + tf.chainstate.reset_block_failure_flags(&m1_id).unwrap(); assert_eq!(tf.best_block_id(), m1_id); assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); assert_ok_blocks_at_stage(&tf, &[a0_id], BlockValidationStage::CheckBlockOk); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: m1_id.into(), + height: BlockHeight::new(2), + is_initial_block_download: false + }] + ); }); } @@ -694,6 +825,8 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip1(#[case] seed: Seed) { let (a1_id, result) = process_block_with_empty_tx(&mut rng, &mut tf, &a0_id.into()); assert!(result.is_err()); + let events = EventList::subscribe_new(&mut tf); + // Reset the fail flags of a1. tf.chainstate.reset_block_failure_flags(&a1_id).unwrap(); @@ -703,6 +836,9 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip1(#[case] seed: Seed) { // Resetting block status has removed the block index, because the block data itself was missing. assert_no_block_indices(&tf, &[a1_id]); + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); + // For completeness, invalidate m0 and check that the chain reorgs to a0. tf.chainstate.invalidate_block(&m0_id).unwrap(); @@ -713,6 +849,16 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip1(#[case] seed: Seed) { assert_no_block_indices(&tf, &[a1_id]); assert_invalidated_blocks_at_stage(&tf, &[m0_id], BlockValidationStage::FullyChecked); assert_blocks_with_bad_parent_at_stage(&tf, &[m1_id], BlockValidationStage::FullyChecked); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: a0_id.into(), + height: BlockHeight::new(1), + is_initial_block_download: false + }] + ); }); } @@ -749,6 +895,8 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip2(#[case] seed: Seed) { assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); assert_ok_blocks_at_stage(&tf, &[a0_id, a1_id], BlockValidationStage::CheckBlockOk); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.invalidate_block(&m0_id).unwrap(); // a0 is now the best block, a1 is marked as bad. @@ -757,6 +905,16 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip2(#[case] seed: Seed) { assert_bad_blocks_at_stage(&tf, &[a1_id], BlockValidationStage::CheckBlockOk); assert_invalidated_blocks_at_stage(&tf, &[m0_id], BlockValidationStage::FullyChecked); assert_blocks_with_bad_parent_at_stage(&tf, &[m1_id], BlockValidationStage::FullyChecked); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: a0_id.into(), + height: BlockHeight::new(1), + is_initial_block_download: false + }] + ); }); } @@ -821,12 +979,17 @@ fn test_invalidation_with_reorg_attempt_to_chain_with_lower_chain_trust(#[case] assert!(a1_ct > a0_ct); assert!(a1_ct > m1_ct); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.reset_block_failure_flags(&a1_id).unwrap(); assert_eq!(tf.best_block_id(), m1_id); assert_fully_valid_blocks(&tf, &[m0_id, m1_id]); assert_ok_blocks_at_stage(&tf, &[a0_id], BlockValidationStage::CheckBlockOk); assert_no_block_indices(&tf, &[a1_id]); + + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); }); } @@ -878,6 +1041,8 @@ fn test_invalidation_with_reorg_to_chain_with_tip_far_in_the_future(#[case] seed assert_ok_blocks_at_stage(&tf, &[a0_id], BlockValidationStage::CheckBlockOk); assert_ok_blocks_at_stage(&tf, &[a1_id], BlockValidationStage::Unchecked); + let events = EventList::subscribe_new(&mut tf); + tf.chainstate.invalidate_block(&m0_id).unwrap(); // a0 is now the best block, a1 is still ok and unchecked. @@ -886,6 +1051,16 @@ fn test_invalidation_with_reorg_to_chain_with_tip_far_in_the_future(#[case] seed assert_ok_blocks_at_stage(&tf, &[a1_id], BlockValidationStage::Unchecked); assert_invalidated_blocks_at_stage(&tf, &[m0_id], BlockValidationStage::FullyChecked); assert_blocks_with_bad_parent_at_stage(&tf, &[m1_id], BlockValidationStage::FullyChecked); + + tf.chainstate.wait_for_all_events(); + assert_eq!( + &events.pop_all(), + &[ChainstateEvent::NewTip { + id: a0_id.into(), + height: BlockHeight::new(1), + is_initial_block_download: false + }] + ); }); } @@ -928,6 +1103,8 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb assert_bad_blocks_at_stage(&tf, &[a1_id], BlockValidationStage::CheckBlockOk); + let events = EventList::subscribe_new(&mut tf); + // Reset the fail flags of a1. tf.chainstate.reset_block_failure_flags(&a1_id).unwrap(); @@ -935,6 +1112,9 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb assert_fully_valid_blocks(&tf, &[m0_id, m1_id, m2_id]); assert_ok_blocks_at_stage(&tf, &[a0_id, a1_id], BlockValidationStage::CheckBlockOk); + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); + let (a2_id, result) = process_block_spend_parent_reward(&mut tf, &a1_id.into(), &mut rng); assert!(result.is_ok()); @@ -947,6 +1127,9 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb BlockValidationStage::CheckBlockOk, ); + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); + let (a3_id, result) = process_block(&mut tf, &a2_id.into(), &mut rng); assert!(result.is_err()); @@ -962,5 +1145,8 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb &[a2_id, a3_id], BlockValidationStage::CheckBlockOk, ); + + tf.chainstate.wait_for_all_events(); + assert_eq!(&events.pop_all(), &[]); }); } diff --git a/chainstate/test-suite/src/tests/events_tests.rs b/chainstate/test-suite/src/tests/events_tests.rs index 89666bd6db..191b60f793 100644 --- a/chainstate/test-suite/src/tests/events_tests.rs +++ b/chainstate/test-suite/src/tests/events_tests.rs @@ -25,8 +25,8 @@ use chainstate::{ }; use chainstate_test_framework::{OrphanErrorHandler, TestChainstate, TestFramework}; use common::{ - chain::block::timestamp::BlockTimestamp, - primitives::{id::Idable, BlockHeight}, + chain::{block::timestamp::BlockTimestamp, GenBlock}, + primitives::{id::Idable, BlockHeight, Id}, }; use randomness::Rng; use test_utils::{ @@ -34,7 +34,8 @@ use test_utils::{ random::{make_seedable_rng, Seed}, }; -use crate::tests::EventList; +// TODO use EventList from helpers instead. +type EventList = Arc, BlockHeight)>>>; type ErrorList = Arc>>; @@ -329,7 +330,7 @@ async fn several_subscribers_several_events_broadcaster(#[case] seed: Seed) { .build(&mut rng); let index = tf.process_block(block.clone(), BlockSource::Local).ok().flatten().unwrap(); expected_events.push(ChainstateEvent::NewTip { - id: *index.block_id(), + id: (*index.block_id()).into(), height: index.block_height(), is_initial_block_download: is_ibd, }); diff --git a/chainstate/test-suite/src/tests/helpers/mod.rs b/chainstate/test-suite/src/tests/helpers/mod.rs index 47be1efb7d..a76cc61c60 100644 --- a/chainstate/test-suite/src/tests/helpers/mod.rs +++ b/chainstate/test-suite/src/tests/helpers/mod.rs @@ -13,6 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::{ + ops::DerefMut as _, + sync::{Arc, Mutex}, +}; + +use chainstate::ChainstateEvent; use chainstate_test_framework::{anyonecanspend_address, TestFramework, TransactionBuilder}; use common::{ chain::{ @@ -77,3 +83,32 @@ pub fn new_pub_key_destination(rng: &mut (impl Rng + CryptoRng)) -> Destination let (_, pub_key) = PrivateKey::new_from_rng(rng, KeyKind::Secp256k1Schnorr); Destination::PublicKey(pub_key) } + +#[derive(Clone)] +pub struct EventList(Arc>>); + +impl EventList { + fn new() -> Self { + Self(Arc::new(Mutex::new(Vec::new()))) + } + + pub fn subscribe_new(tf: &mut TestFramework) -> Self { + let this = Self::new(); + this.subscribe(tf); + this + } + + pub fn subscribe(&self, tf: &mut TestFramework) { + let this = self.clone(); + let subscribe_func = Arc::new(move |event| this.push(event)); + tf.chainstate.subscribe_to_subsystem_events(subscribe_func); + } + + pub fn push(&self, event: ChainstateEvent) { + self.0.lock().unwrap().push(event) + } + + pub fn pop_all(&self) -> Vec { + std::mem::take(self.0.lock().unwrap().deref_mut()) + } +} diff --git a/chainstate/test-suite/src/tests/mod.rs b/chainstate/test-suite/src/tests/mod.rs index 5866e85c1f..5ac367beaa 100644 --- a/chainstate/test-suite/src/tests/mod.rs +++ b/chainstate/test-suite/src/tests/mod.rs @@ -13,13 +13,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; -use std::sync::Mutex; - use chainstate::BlockSource; use chainstate_test_framework::TestFramework; use common::{ - chain::{signature::inputsig::InputWitness, Block, GenBlock, Genesis}, + chain::{signature::inputsig::InputWitness, GenBlock, Genesis}, primitives::{BlockHeight, Id}, }; use randomness::Rng; @@ -69,8 +66,6 @@ mod tx_verifier_disconnect; mod helpers; -type EventList = Arc, BlockHeight)>>>; - #[ctor::ctor] fn init() { logging::init_logging(); diff --git a/chainstate/test-suite/src/tests/reorgs_tests.rs b/chainstate/test-suite/src/tests/reorgs_tests.rs index 0dd5b9bfdf..334933e6ba 100644 --- a/chainstate/test-suite/src/tests/reorgs_tests.rs +++ b/chainstate/test-suite/src/tests/reorgs_tests.rs @@ -13,28 +13,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; -use std::sync::Mutex; - -use crate::tests::EventList; -use chainstate::BlockError; -use chainstate::BlockSource; -use chainstate::ChainstateError; -use chainstate::ChainstateEvent; -use chainstate::ConnectTransactionError; -use chainstate_test_framework::TestFramework; -use common::chain::Block; -use common::chain::GenBlock; -use common::chain::Transaction; -use common::chain::UtxoOutPoint; -use common::primitives::BlockHeight; -use common::primitives::Id; -use common::primitives::Idable; -use randomness::CryptoRng; -use randomness::Rng; +use std::sync::{Arc, Mutex}; + use rstest::rstest; -use test_utils::random::make_seedable_rng; -use test_utils::random::Seed; + +use chainstate::{ + BlockError, BlockSource, ChainstateError, ChainstateEvent, ConnectTransactionError, +}; +use chainstate_test_framework::TestFramework; +use common::{ + chain::{Block, GenBlock, Transaction, UtxoOutPoint}, + primitives::{id::Idable, BlockHeight, Id}, +}; +use randomness::{CryptoRng, Rng}; +use test_utils::random::{make_seedable_rng, Seed}; + +// TODO use EventList from helpers instead. +type EventList = Arc, BlockHeight)>>>; // Produce `genesis -> a` chain, then a parallel `genesis -> b -> c` that should trigger a reorg. #[rstest] diff --git a/mempool/src/event.rs b/mempool/src/event.rs index 45c849fd2f..399b35ef6c 100644 --- a/mempool/src/event.rs +++ b/mempool/src/event.rs @@ -14,7 +14,7 @@ // limitations under the License. use common::{ - chain::{Block, Transaction}, + chain::{GenBlock, Transaction}, primitives::{BlockHeight, Id}, }; @@ -84,16 +84,16 @@ impl TransactionProcessed { /// Event triggered when mempool has synced up to given tip #[derive(Debug, Clone, Eq, PartialEq)] pub struct NewTip { - block_id: Id, + block_id: Id, height: BlockHeight, } impl NewTip { - pub fn new(block_id: Id, height: BlockHeight) -> Self { + pub fn new(block_id: Id, height: BlockHeight) -> Self { Self { block_id, height } } - pub fn block_id(&self) -> &Id { + pub fn block_id(&self) -> &Id { &self.block_id } diff --git a/mempool/src/pool/mod.rs b/mempool/src/pool/mod.rs index 9a2a3faa3d..5d759bb253 100644 --- a/mempool/src/pool/mod.rs +++ b/mempool/src/pool/mod.rs @@ -17,7 +17,7 @@ use std::{num::NonZeroUsize, sync::Arc}; use chainstate::{ChainstateError, ChainstateEvent}; use common::{ - chain::{Block, ChainConfig, GenBlock, SignedTransaction, Transaction}, + chain::{ChainConfig, GenBlock, SignedTransaction, Transaction}, primitives::{time::Time, BlockHeight, Id}, time_getter::TimeGetter, }; @@ -460,7 +460,7 @@ impl Mempool { fn on_new_tip( &mut self, - block_id: Id, + block_id: Id, height: BlockHeight, is_initial_block_download: bool, ) -> Result<(), Error> { @@ -470,7 +470,7 @@ impl Mempool { MempoolState::InIbd(state) => { log::debug!("New tip {block_id:x} at height {height} (InIbd)"); - state.best_block_id = block_id.into(); + state.best_block_id = block_id; } MempoolState::AfterIbd(state) => { log::debug!("New tip {block_id:x} at height {height} (AfterIbd)"); diff --git a/mempool/src/pool/tests/basic.rs b/mempool/src/pool/tests/basic.rs index f2a78bc27c..3deac9f3f7 100644 --- a/mempool/src/pool/tests/basic.rs +++ b/mempool/src/pool/tests/basic.rs @@ -234,7 +234,7 @@ async fn reject_txs_during_ibd(#[case] seed: Seed) { mempool .process_chainstate_event(ChainstateEvent::NewTip { - id: block1_id, + id: block1_id.into(), height: BlockHeight::new(1), is_initial_block_download: true, }) @@ -258,7 +258,7 @@ async fn reject_txs_during_ibd(#[case] seed: Seed) { .unwrap(); mempool .process_chainstate_event(ChainstateEvent::NewTip { - id: block2_id, + id: block2_id.into(), height: BlockHeight::new(2), is_initial_block_download: true, }) @@ -283,7 +283,7 @@ async fn reject_txs_during_ibd(#[case] seed: Seed) { .unwrap(); mempool .process_chainstate_event(ChainstateEvent::NewTip { - id: block3_id, + id: block3_id.into(), height: BlockHeight::new(3), is_initial_block_download: false, }) @@ -346,7 +346,7 @@ async fn ibd_transition(#[case] seed: Seed) { .unwrap(); mempool .process_chainstate_event(ChainstateEvent::NewTip { - id: block_id, + id: block_id.into(), height: block_height, is_initial_block_download: false, }) @@ -366,7 +366,7 @@ async fn ibd_transition(#[case] seed: Seed) { assert!(mempool.contains_transaction(&tx_id)); // Check that the new tip event was sent. - let expected_event = MempoolEvent::NewTip(NewTip::new(block_id, block_height)); + let expected_event = MempoolEvent::NewTip(NewTip::new(block_id.into(), block_height)); let event = events_rx.recv().await; assert_eq!(event.as_ref(), Some(&expected_event)); diff --git a/mempool/src/pool/tx_pool/mod.rs b/mempool/src/pool/tx_pool/mod.rs index 8b24620d0b..6e3c830640 100644 --- a/mempool/src/pool/tx_pool/mod.rs +++ b/mempool/src/pool/tx_pool/mod.rs @@ -40,8 +40,8 @@ use chainstate::{ }; use common::{ chain::{ - block::timestamp::BlockTimestamp, Block, ChainConfig, GenBlock, SignedTransaction, - Transaction, TxInput, + block::timestamp::BlockTimestamp, ChainConfig, GenBlock, SignedTransaction, Transaction, + TxInput, }, primitives::{amount::DisplayAmount, time::Time, Amount, BlockHeight, Id}, time_getter::TimeGetter, @@ -878,7 +878,7 @@ impl TxPool { pub fn reorg( &mut self, - block_id: Id, + block_id: Id, _block_height: BlockHeight, finalizer: impl for<'b> FnMut(TxAdditionOutcome, &'b Self), ) -> Result<(), ReorgError> { diff --git a/mempool/src/pool/tx_pool/reorg.rs b/mempool/src/pool/tx_pool/reorg.rs index 31d3a1da3e..d026b695c8 100644 --- a/mempool/src/pool/tx_pool/reorg.rs +++ b/mempool/src/pool/tx_pool/reorg.rs @@ -110,7 +110,7 @@ impl ReorgData { fn fetch_disconnected_txs( tx_pool: &TxPool, - new_tip: Id, + new_tip: Id, ) -> Result, ReorgError> { let old_tip = tx_pool .tx_verifier @@ -123,13 +123,13 @@ fn fetch_disconnected_txs( tx_pool .blocking_chainstate_handle() - .call(move |c| ReorgData::from_chainstate(c, old_tip, new_tip.into()))? + .call(move |c| ReorgData::from_chainstate(c, old_tip, new_tip))? .map(|data| data.into_disconnected_transactions(now)) } pub fn handle_new_tip( tx_pool: &mut TxPool, - new_tip: Id, + new_tip: Id, finalizer: impl FnMut(TxAdditionOutcome, &TxPool), ) -> Result<(), ReorgError> { tx_pool.rolling_fee_rate.get_mut().set_block_since_last_rolling_fee_bump(true); diff --git a/mempool/src/pool/tx_pool/tests/utils.rs b/mempool/src/pool/tx_pool/tests/utils.rs index 1c38e96aed..abbe6d85fc 100644 --- a/mempool/src/pool/tx_pool/tests/utils.rs +++ b/mempool/src/pool/tx_pool/tests/utils.rs @@ -82,7 +82,7 @@ impl TxPool { block_id: Id, block_height: BlockHeight, ) -> Result<(), ReorgError> { - self.reorg(block_id, block_height, |_, _| ()) + self.reorg(block_id.into(), block_height, |_, _| ()) } } diff --git a/mempool/src/rpc_event.rs b/mempool/src/rpc_event.rs index c75e65d51d..e22489329c 100644 --- a/mempool/src/rpc_event.rs +++ b/mempool/src/rpc_event.rs @@ -14,7 +14,7 @@ // limitations under the License. use common::{ - chain::{Block, Transaction}, + chain::{GenBlock, Transaction}, primitives::{BlockHeight, Id}, }; use mempool_types::{tx_options::TxRelayPolicy, tx_origin::LocalTxOrigin}; @@ -26,7 +26,7 @@ use crate::event::MempoolEvent; #[serde(tag = "type", content = "content")] pub enum RpcEvent { NewTip { - id: Id, + id: Id, height: BlockHeight, }, TransactionProcessed { diff --git a/p2p/src/peer_manager_event.rs b/p2p/src/peer_manager_event.rs index 84796bdf20..fc81a94149 100644 --- a/p2p/src/peer_manager_event.rs +++ b/p2p/src/peer_manager_event.rs @@ -16,7 +16,7 @@ use std::time::Duration; use common::{ - chain::{Block, Transaction}, + chain::{Block, GenBlock, Transaction}, primitives::{time::Time, Id}, }; use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress}; @@ -86,7 +86,11 @@ pub enum PeerManagerEvent { /// that if NewTipReceived is produced, it will be accompanied by NewChainstateTip. /// However, peer manager should not use this fact and treat them as independent /// events instead. - NewChainstateTip(Id), + /// E.g. it's possible to receive `NewChainstateTip` after a block has been invalidated + /// by the user via the corresponding chainstate RPC call, in which case it will not be + /// accompanied by `NewTipReceived` (and this is also why `NewChainstateTip` contains + /// `Id` - after the invalidation, the genesis may become the new best block). + NewChainstateTip(Id), /// New valid unseen transaction received. /// It is used as an eviction criterion. diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index c8b71e4b98..a43acb4652 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -33,7 +33,7 @@ use tokio::{ use tracing::Instrument; use common::{ - chain::{config::ChainConfig, Block, Transaction}, + chain::{config::ChainConfig, GenBlock, Transaction}, primitives::Id, time_getter::TimeGetter, }; @@ -58,7 +58,7 @@ use self::chainstate_handle::ChainstateHandle; #[derive(Debug, Clone)] pub enum LocalEvent { - ChainstateNewTip(Id), + ChainstateNewTip(Id), MempoolNewTx(Id), } @@ -273,7 +273,7 @@ where } /// Announces the header of a new block to peers. - async fn handle_new_tip(&mut self, block_id: Id) -> Result<()> { + async fn handle_new_tip(&mut self, block_id: Id) -> Result<()> { self.peer_mgr_event_sender .send(PeerManagerEvent::NewChainstateTip(block_id)) .map_err(|_| P2pError::ChannelClosed)?; @@ -383,7 +383,7 @@ where /// Returns a receiver for the chainstate `NewTip` events. pub async fn subscribe_to_new_tip( chainstate_handle: &ChainstateHandle, -) -> Result>> { +) -> Result>> { let (sender, receiver) = mpsc::unbounded_channel(); let subscribe_func = diff --git a/p2p/src/sync/peer/block_manager.rs b/p2p/src/sync/peer/block_manager.rs index 27c3d77cce..6f2786c260 100644 --- a/p2p/src/sync/peer/block_manager.rs +++ b/p2p/src/sync/peer/block_manager.rs @@ -235,7 +235,7 @@ where self.send_message(BlockSyncMessage::HeaderList(headers)) } - async fn handle_new_tip(&mut self, new_tip_id: &Id) -> Result<()> { + async fn handle_new_tip(&mut self, new_tip_id: &Id) -> Result<()> { // This function is not supposed to be called when in IBD. debug_assert!(!self.chainstate_handle.is_initial_block_download().await?); diff --git a/p2p/src/sync/tests/block_response.rs b/p2p/src/sync/tests/block_response.rs index 931407f83f..a7d5a5b391 100644 --- a/p2p/src/sync/tests/block_response.rs +++ b/p2p/src/sync/tests/block_response.rs @@ -239,7 +239,7 @@ async fn block_responses_in_wrong_order(#[case] seed: Seed) { peer_id: peer.get_id(), block_id: expected_block_id, }, - PeerManagerEventDesc::NewChainstateTip(expected_block_id), + PeerManagerEventDesc::NewChainstateTip(expected_block_id.into()), ] .into_iter(), ), diff --git a/p2p/src/sync/tests/helpers/mod.rs b/p2p/src/sync/tests/helpers/mod.rs index 40cb335e59..142ed310ba 100644 --- a/p2p/src/sync/tests/helpers/mod.rs +++ b/p2p/src/sync/tests/helpers/mod.rs @@ -85,7 +85,7 @@ pub struct TestNode { subsystem_manager_handle: ManagerJoinHandle, chainstate_handle: ChainstateHandle, mempool_handle: MempoolHandle, - new_tip_receiver: UnboundedReceiver>, + new_tip_receiver: UnboundedReceiver>, tx_processed_receiver: UnboundedReceiver, sync_mgr_notification_receiver: UnboundedReceiver, protocol_version: ProtocolVersion, @@ -282,7 +282,7 @@ impl TestNode { expect_future_val!(future) } - pub async fn receive_new_tip_event(&mut self) -> Id { + pub async fn receive_new_tip_event(&mut self) -> Id { expect_recv!(self.new_tip_receiver) } @@ -650,7 +650,7 @@ pub enum PeerManagerEventDesc { peer_id: PeerId, block_id: Id, }, - NewChainstateTip(Id), + NewChainstateTip(Id), NewValidTransactionReceived { peer_id: PeerId, txid: Id, From 6a32ca8ca18093dd79b42a3b6c22284972de5e4e Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Mon, 23 Feb 2026 12:46:40 +0200 Subject: [PATCH 6/6] Apply review comment --- .../src/tests/block_invalidation.rs | 44 +++++++++---------- .../test-suite/src/tests/helpers/mod.rs | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/chainstate/test-suite/src/tests/block_invalidation.rs b/chainstate/test-suite/src/tests/block_invalidation.rs index 7d2f02b60c..c887855b21 100644 --- a/chainstate/test-suite/src/tests/block_invalidation.rs +++ b/chainstate/test-suite/src/tests/block_invalidation.rs @@ -119,7 +119,7 @@ fn test_stale_chain_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder) ); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); }); } @@ -155,7 +155,7 @@ fn test_basic_tip_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: m0_id.into(), height: BlockHeight::new(1), @@ -169,7 +169,7 @@ fn test_basic_tip_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: m1_id.into(), height: BlockHeight::new(2), @@ -211,7 +211,7 @@ fn test_basic_parent_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: genesis_id.into(), height: BlockHeight::new(0), @@ -225,7 +225,7 @@ fn test_basic_parent_invalidation(#[case] seed: Seed, #[case] sb: StorageBuilder tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: m1_id.into(), height: BlockHeight::new(2), @@ -405,7 +405,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: d[2].into(), height: BlockHeight::new(5), @@ -450,7 +450,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); } { @@ -492,7 +492,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: c[1].into(), height: BlockHeight::new(4), @@ -541,7 +541,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: b[0].into(), height: BlockHeight::new(3), @@ -595,7 +595,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); } { @@ -640,7 +640,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { assert_eq!(tf.get_min_height_with_allowed_reorg(), 2.into()); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); } { @@ -691,7 +691,7 @@ fn complex_test_impl(mut tf: TestFramework, block_ids: &TestChainBlockIds) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: m[6].into(), height: BlockHeight::new(7), @@ -773,7 +773,7 @@ fn test_tip_invalidation_with_no_better_candidates(#[case] seed: Seed, #[case] s tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: m0_id.into(), height: BlockHeight::new(1), @@ -789,7 +789,7 @@ fn test_tip_invalidation_with_no_better_candidates(#[case] seed: Seed, #[case] s tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: m1_id.into(), height: BlockHeight::new(2), @@ -837,7 +837,7 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip1(#[case] seed: Seed) { assert_no_block_indices(&tf, &[a1_id]); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); // For completeness, invalidate m0 and check that the chain reorgs to a0. @@ -852,7 +852,7 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip1(#[case] seed: Seed) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: a0_id.into(), height: BlockHeight::new(1), @@ -908,7 +908,7 @@ fn test_invalidation_with_reorg_to_chain_with_bad_tip2(#[case] seed: Seed) { tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: a0_id.into(), height: BlockHeight::new(1), @@ -989,7 +989,7 @@ fn test_invalidation_with_reorg_attempt_to_chain_with_lower_chain_trust(#[case] assert_no_block_indices(&tf, &[a1_id]); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); }); } @@ -1054,7 +1054,7 @@ fn test_invalidation_with_reorg_to_chain_with_tip_far_in_the_future(#[case] seed tf.chainstate.wait_for_all_events(); assert_eq!( - &events.pop_all(), + &events.take(), &[ChainstateEvent::NewTip { id: a0_id.into(), height: BlockHeight::new(1), @@ -1113,7 +1113,7 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb assert_ok_blocks_at_stage(&tf, &[a0_id, a1_id], BlockValidationStage::CheckBlockOk); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); let (a2_id, result) = process_block_spend_parent_reward(&mut tf, &a1_id.into(), &mut rng); assert!(result.is_ok()); @@ -1128,7 +1128,7 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb ); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); let (a3_id, result) = process_block(&mut tf, &a2_id.into(), &mut rng); assert!(result.is_err()); @@ -1147,6 +1147,6 @@ fn test_reset_bad_stale_tip_status_and_add_blocks(#[case] seed: Seed, #[case] sb ); tf.chainstate.wait_for_all_events(); - assert_eq!(&events.pop_all(), &[]); + assert_eq!(&events.take(), &[]); }); } diff --git a/chainstate/test-suite/src/tests/helpers/mod.rs b/chainstate/test-suite/src/tests/helpers/mod.rs index a76cc61c60..b8a980925e 100644 --- a/chainstate/test-suite/src/tests/helpers/mod.rs +++ b/chainstate/test-suite/src/tests/helpers/mod.rs @@ -108,7 +108,7 @@ impl EventList { self.0.lock().unwrap().push(event) } - pub fn pop_all(&self) -> Vec { + pub fn take(&self) -> Vec { std::mem::take(self.0.lock().unwrap().deref_mut()) } }