From 57682889d91a1b5b4254fb9dfa629d48d7446c7a Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Wed, 11 Feb 2026 17:57:23 +0000 Subject: [PATCH 1/6] changes --- .../src/consensus_manager/state_machine.rs | 158 +++++++--- consensus/src/consensus_manager/view_chain.rs | 296 ++++++++++++++---- .../src/consensus_manager/view_context.rs | 22 ++ .../src/consensus_manager/view_manager.rs | 172 +++++----- p2p/src/service.rs | 11 +- tests/src/e2e_consensus/scenarios.rs | 12 +- 6 files changed, 472 insertions(+), 199 deletions(-) diff --git a/consensus/src/consensus_manager/state_machine.rs b/consensus/src/consensus_manager/state_machine.rs index 7503d90..eb75ef3 100644 --- a/consensus/src/consensus_manager/state_machine.rs +++ b/consensus/src/consensus_manager/state_machine.rs @@ -321,7 +321,7 @@ impl ConsensusStateMachine< while let Ok(message) = self.message_consumer.pop() { did_work = true; if let Err(e) = self.handle_consensus_message(message) { - slog::error!(self.logger, "Error handling consensus message: {}", e); + slog::warn!(self.logger, "Error handling consensus message: {}", e); } } @@ -331,7 +331,7 @@ impl ConsensusStateMachine< match self.handle_tick() { Ok(_) => {} Err(e) => { - slog::error!(self.logger, "Error handling tick: {}", e); + slog::warn!(self.logger, "Error handling tick: {}", e); } } last_tick = Instant::now(); @@ -437,7 +437,7 @@ impl ConsensusStateMachine< ViewProgressEvent::ShouldFinalize { view, block_hash } => { self.finalize_view(view, block_hash) } - ViewProgressEvent::ShouldNullify { view } => self.nullify_view(view), + ViewProgressEvent::ShouldNullify { view } => self.nullify_view(view, false), ViewProgressEvent::ShouldBroadcastNullification { view } => { self.broadcast_nullification(view) } @@ -531,7 +531,7 @@ impl ConsensusStateMachine< slog::info!(self.logger, "TODO: Request missing state here"); Ok(()) } - ViewProgressEvent::ShouldNullifyView { view } => self.nullify_view(view), + ViewProgressEvent::ShouldNullifyView { view } => self.nullify_view(view, false), ViewProgressEvent::BroadcastConsensusMessage { message } => { self.broadcast_consensus_message(*message) } @@ -548,8 +548,9 @@ impl ConsensusStateMachine< current_view ); - // 1. Broadcast the nullification for the view that triggered the cascade (if - // needed) + // 1. Broadcast the aggregated nullification for the view that triggered + // the cascade (if needed). This lets other nodes learn about the + // nullification and independently handle their own cascades. if should_broadcast_nullification && let Err(e) = self.broadcast_nullification(start_view) { @@ -561,24 +562,31 @@ impl ConsensusStateMachine< ); } - // 2. Nullify all views from start_view to current_view - for view in (start_view + 1)..=current_view { - if let Err(e) = self.nullify_view(view) { - slog::debug!( - self.logger, - "View {} already nullified or error during cascade: {}", - view, - e - ); - } + // 2. Per paper Algorithm 1 Step 8: do NOT nullify intermediate views. + // Intermediate views between start_view and current_view were already + // processed through normal consensus flow (M-notarized or nullified). + // Marking them as nullified would corrupt their state and cause + // SelectParent to return inconsistent results across nodes, breaking + // chain integrity. Only the current view should be nullified. + + // 3. Send a single Nullify for the current view (per paper Algorithm 1, + // Step 8). This broadcasts our vote AND marks it locally as nullified. + // nullify_view also calls mark_nullified which removes the pending diff. + if let Err(e) = self.nullify_view(current_view, true) { + slog::debug!( + self.logger, + "Failed to nullify current view {} during cascade: {}", + current_view, + e + ); } - // 2. Progress to a new fresh view + // 4. Create a new view context and progress to it. + // Unlike normal nullification flow (which requires aggregated proof), + // cascade progression only requires local nullification of current view. let new_view = current_view + 1; - - // 3. Find the most recent valid parent (skips all nullified views) - let parent_hash = self.view_manager.select_parent(new_view); - let leader = self.view_manager.leader_for_view(new_view)?; + let (leader, parent_hash) = + self.view_manager.progress_after_cascade(new_view)?; slog::info!( self.logger, @@ -587,7 +595,7 @@ impl ConsensusStateMachine< parent_hash ); - // 4. Progress to the new view + // 5. Replay buffered messages and propose if leader self.progress_to_next_view(new_view, leader, parent_hash)?; Ok(()) @@ -602,19 +610,19 @@ impl ConsensusStateMachine< current_view ); - // Send nullify messages for all views from start_view to current_view (INCLUSIVE) - // This marks them as has_nullified locally and broadcasts nullify votes, - // but does NOT progress to a new view - we must wait for a Nullification - // (aggregated proof with 2F+1 signatures) before we can safely progress. - for view in start_view..=current_view { - if let Err(e) = self.nullify_view(view) { - slog::debug!( - self.logger, - "View {} already nullified or error: {}", - view, - e - ); - } + // Per paper: do NOT nullify intermediate views between start_view and + // current_view. They were already processed through normal consensus flow. + // Marking them as nullified would corrupt their M-notarization state and + // cause SelectParent inconsistencies across nodes. + + // Broadcast nullify for current view only + if let Err(e) = self.nullify_view(current_view, true) { + slog::debug!( + self.logger, + "Failed to nullify current view {}: {}", + current_view, + e + ); } Ok(()) @@ -806,6 +814,14 @@ impl ConsensusStateMachine< "Finalizing view {view} with block hash {block_hash:?}" ); + // Defer finalization of the current view — removing it from non_finalized_views + // would break invariants (current() would panic on the next tick). + // The deferred finalization loop in progress_to_next_view will pick this up + // once we advance to the next view. + if view == self.view_manager.current_view_number() { + return Ok(()); + } + // Notify mempool to remove those transactions associated with the finalized block let finalized_txs = self.view_manager.get_block_tx_hashes(view)?; let notification = FinalizedNotification { @@ -821,9 +837,15 @@ impl ConsensusStateMachine< Ok(()) } - /// Nullify a view - fn nullify_view(&mut self, view: u64) -> Result<()> { - slog::debug!(self.logger, "Nullifying view {view}"); + /// Nullify a view. + /// + /// # Arguments + /// * `view` - The view number to nullify + /// * `force` - If true, use cascade nullification (bypasses has_voted/evidence checks). + /// Used for ShouldCascadeNullification and ShouldNullifyRange events. + /// If false, use normal nullification logic (timeout or Byzantine based on evidence). + fn nullify_view(&mut self, view: u64, force: bool) -> Result<()> { + slog::debug!(self.logger, "Nullifying view {view} (force: {force})"); // Get view context to create nullify message let view_ctx = self.view_manager.view_context_mut(view)?; @@ -836,13 +858,14 @@ impl ConsensusStateMachine< return Ok(()); } - let nullify = if view_ctx.has_voted { - // NOTE: After voting, the current replica must have conflicting evidence, - // so in this case, the view is under Byzantine behavior. - view_ctx.create_nullify_for_byzantine(&self.secret_key)? - } else { - // NOTE: If the current replica attempts to nullify a message before voting, it could be - // timeout OR Byzantine. We distinguish based on the type of evidence: + let nullify = if force { + // Cascade nullification: bypass has_voted check. + // This is safe because cascade is for pending state consistency — + // Lemma 5.3 guarantees that a nullified parent can never be L-notarized, + // so descendant views' pending state must be cleaned up. + view_ctx.create_nullify_for_cascade(&self.secret_key)? + } else if view_ctx.has_voted { + // After voting, we need conflicting evidence to nullify (Byzantine). let num_conflicting_votes = view_ctx.num_invalid_votes; let num_nullify_messages = view_ctx.nullify_messages.len(); let combined_count = num_conflicting_votes + num_nullify_messages; @@ -852,6 +875,18 @@ impl ConsensusStateMachine< // - Conflicting votes > F means equivocation (can't finalize) // - Combined evidence > 2F indicates Byzantine quorum view_ctx.create_nullify_for_byzantine(&self.secret_key)? + } else { + // Voted but no sufficient evidence — cannot nullify + return Ok(()); + } + } else { + // Before voting: could be timeout OR Byzantine + let num_conflicting_votes = view_ctx.num_invalid_votes; + let num_nullify_messages = view_ctx.nullify_messages.len(); + let combined_count = num_conflicting_votes + num_nullify_messages; + + if num_conflicting_votes > F || combined_count > 2 * F { + view_ctx.create_nullify_for_byzantine(&self.secret_key)? } else { // Timeout (no strong evidence of Byzantine behavior) view_ctx.create_nullify_for_timeout(&self.secret_key)? @@ -890,6 +925,35 @@ impl ConsensusStateMachine< "Progressing to next view {view} with leader {leader} and notarized block hash {notarized_block_hash:?}" ); + // Try to finalize any pending views that now have enough votes. + // This handles the case where votes accumulated during previous views + // but finalization was deferred (e.g., missing block, missing parent M-notarization). + while let Some((finalizable_view, block_hash)) = + self.view_manager.oldest_finalizable_view() + { + slog::info!( + self.logger, + "Attempting deferred finalization for view {} (block: {:?})", + finalizable_view, + block_hash + ); + if let Err(e) = self.finalize_view(finalizable_view, block_hash) { + slog::warn!( + self.logger, + "Deferred finalization failed for view {}: {}", + finalizable_view, + e + ); + break; + } + // If the view is still present after finalize_view returned Ok(()), + // it means finalization was deferred (e.g., ancestor missing block). + // Break to avoid an infinite loop — we'll retry on the next view progression. + if self.view_manager.find_view_context(finalizable_view).is_some() { + break; + } + } + // Replay any buffered messages for this view (or previous ones) let pending_views: Vec = self .pending_messages @@ -900,7 +964,7 @@ impl ConsensusStateMachine< for pending_view in pending_views { if let Some(messages) = self.pending_messages.remove(&pending_view) { - slog::debug!( + slog::warn!( self.logger, "Replaying {} buffered messages for view {}", messages.len(), @@ -908,7 +972,7 @@ impl ConsensusStateMachine< ); for msg in messages { if let Err(e) = self.handle_consensus_message(msg) { - slog::error!(self.logger, "Failed to process buffered message: {}", e); + slog::warn!(self.logger, "Failed to process buffered message: {}", e); } } } diff --git a/consensus/src/consensus_manager/view_chain.rs b/consensus/src/consensus_manager/view_chain.rs index 0ad7ef7..78f9d32 100644 --- a/consensus/src/consensus_manager/view_chain.rs +++ b/consensus/src/consensus_manager/view_chain.rs @@ -211,6 +211,68 @@ impl ViewChain Option<(u64, [u8; blake3::OUT_LEN])> { + let mut candidates: Vec = self + .non_finalized_views + .iter() + .filter(|(_, ctx)| { + ctx.votes.len() >= N - F + && ctx.block.is_some() + && ctx.nullification.is_none() + && !ctx.has_nullified + }) + .map(|(v, _)| *v) + .collect(); + + candidates.sort(); + + for view_num in candidates { + // Skip the current view — cannot finalize during active processing + if view_num >= self.current_view { + continue; + } + + let ctx = self.non_finalized_views.get(&view_num).unwrap(); + let block = ctx.block.as_ref().unwrap(); + let parent_hash = block.parent_block_hash(); + let parent_view = self.find_parent_view(&parent_hash); + + let parent_valid = match parent_view { + Some(parent_view_number) => { + let parent_ctx = self.non_finalized_views.get(&parent_view_number).unwrap(); + // Parent must have M-notarization and not be nullified + parent_ctx.m_notarization.is_some() + && parent_ctx.nullification.is_none() + && !parent_ctx.has_nullified + // All intermediate views must be nullified + && (parent_view_number + 1..view_num).all(|inter| { + self.non_finalized_views + .get(&inter) + .map(|c| c.nullification.is_some() || c.has_nullified) + .unwrap_or(true) // Already garbage collected + }) + } + None => { + // Parent already finalized — check it matches previously committed hash + parent_hash == self.previously_committed_block_hash + } + }; + + if parent_valid { + return Some((view_num, ctx.block_hash.unwrap())); + } + } + + None + } + /// Stores a pre-computed [`StateDiff`] instance for a view. /// /// This method is called when block validation completes and produces a [`StateDiff`] @@ -319,10 +381,11 @@ impl ViewChain ViewChain ViewChain, + ) -> Result<()> { + // 1. Check that the next view context is the next view. + if next_view_ctx.view_number != self.current_view + 1 { + return Err(anyhow::anyhow!( + "View number {} is not the next view number {}", + next_view_ctx.view_number, + self.current_view + 1 + )); + } + + // 2. Check that the current view has been locally nullified. + // Unlike progress_with_nullification, we accept has_nullified without + // requiring an aggregated nullification proof. + let current = self.current(); + if !current.has_nullified && current.nullification.is_none() { + return Err(anyhow::anyhow!( + "The current view {} has not been nullified (locally or via proof), but progress_after_cascade was called", + self.current_view + )); + } + + // 3. Update the current view to the next view. + self.current_view = next_view_ctx.view_number; + self.non_finalized_views + .insert(next_view_ctx.view_number, next_view_ctx); + + Ok(()) + } + /// Finalizes a view with L-notarization and performs garbage collection. /// /// L-notarization (n-f votes) is the ONLY mechanism that finalizes blocks and commits them @@ -714,7 +830,7 @@ impl ViewChain Result<()> { - // 1. Check that the view number is not the current view. + // 1. Cannot finalize views from the future. if finalized_view > self.current_view { return Err(anyhow::anyhow!( "View number {} is not greater than the current view {}, cannot finalize views in the future", @@ -740,6 +856,15 @@ impl ViewChain ViewChain ViewChain= N - F { - // Has L-notarization too - self.previously_committed_block_hash = ctx.block_hash.unwrap(); + // The parent view — child's L-notarization implies parent is committed. + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; + } + if ctx.votes.len() >= N - F && ctx.block.is_some() { self.persist_l_notarized_view(&ctx, peers)?; } else { - // Only M-notarized + // M-notarized (or L-notarized without block) — persist available data self.persist_m_notarized_view(&ctx, peers)?; } } else if view_number > parent_view_number { // Intermediate view - must be nullified (already validated above) self.persist_nullified_view(&ctx, peers)?; } else { - // View before parent - could be part of an earlier chain - // These should have been finalized in a previous call or be nullified - if ctx.nullification.is_some() { + // View before parent - could be part of an earlier chain. + // M-notarized views are persisted as finalized even if also nullified, + // because their blocks may be referenced as parents in the chain. + // Per Lemma 5.3: M-notarization + nullification can coexist. + if ctx.m_notarization.is_some() && ctx.block.is_some() { + if ctx.votes.len() >= N - F { + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; + } + self.persist_l_notarized_view(&ctx, peers)?; + } else { + self.persist_m_notarized_view(&ctx, peers)?; + } + } else if ctx.nullification.is_some() || ctx.has_nullified { self.persist_nullified_view(&ctx, peers)?; - } else if ctx.votes.len() >= N - F { - self.previously_committed_block_hash = ctx.block_hash.unwrap(); + } else if ctx.votes.len() >= N - F && ctx.block.is_some() { + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; + } self.persist_l_notarized_view(&ctx, peers)?; } else { - // M-notarized view from an earlier chain - self.persist_m_notarized_view(&ctx, peers)?; + // M-notarized without block, or no consensus artifacts + if ctx.m_notarization.is_some() { + self.persist_m_notarized_view(&ctx, peers)?; + } else { + self.persist_nullified_view(&ctx, peers)?; + } } } } else { - // Parent not in non-finalized views (already finalized) - // These are intermediate nullified views - if ctx.nullification.is_none() { - return Err(anyhow::anyhow!( - "View {} has no nullification but parent is already finalized", - view_number - )); + // Parent not in non-finalized views (already finalized). + // Same priority: M-notarized views with blocks are persisted as finalized. + if ctx.m_notarization.is_some() && ctx.block.is_some() { + if ctx.votes.len() >= N - F { + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; + } + self.persist_l_notarized_view(&ctx, peers)?; + } else { + self.persist_m_notarized_view(&ctx, peers)?; + } + } else if ctx.nullification.is_some() || ctx.has_nullified { + self.persist_nullified_view(&ctx, peers)?; + } else if ctx.votes.len() >= N - F && ctx.block.is_some() { + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; + } + self.persist_l_notarized_view(&ctx, peers)?; + } else if ctx.m_notarization.is_some() { + self.persist_m_notarized_view(&ctx, peers)?; + } else { + // View with no consensus artifacts (e.g., node never received any messages + // for this view). Persist as nullified to clear it from the chain. + self.persist_nullified_view(&ctx, peers)?; } - self.persist_nullified_view(&ctx, peers)?; } } @@ -1013,16 +1165,19 @@ impl ViewChain ViewChain ViewChain Result<()> { let view_number = ctx.view_number; - if ctx.nullification.is_none() { + if ctx.nullification.is_none() && !ctx.has_nullified { return Err(anyhow::anyhow!( - "View number {view_number} has no nullification, but the view has been nullified", + "View number {view_number} has no nullification or local nullify, but persist_nullified_view was called", )); } @@ -1096,9 +1251,12 @@ impl ViewChain ViewChain = None; for (view_num, ctx) in &self.non_finalized_views { - // Skip views that: - // 1. Have a full nullification quorum, OR - // 2. Have been locally marked for nullification (has_nullified = true) - if ctx.nullification.is_some() || ctx.has_nullified { + // Per paper: SelectParent returns the greatest v' < new_view with M-notarization. + // M-notarization is a permanent fact that cannot be invalidated by nullification + // (Lemma 5.3 only forbids L-notarization + nullification coexistence). + // Skip views that are nullified WITHOUT M-notarization (pure nullification). + // Views with both M-notarization and nullification/has_nullified remain valid + // parents to ensure SelectParent consistency across all nodes. + if (ctx.nullification.is_some() || ctx.has_nullified) + && ctx.m_notarization.is_none() + { continue; } @@ -1272,6 +1435,15 @@ impl ViewChain ViewContext= 5f+1), but L-notarization (n-f) and nullification (2f+1) cannot. + pub fn create_nullify_for_cascade(&mut self, secret_key: &BlsSecretKey) -> Result { + if self.has_nullified { + return Err(anyhow::anyhow!("Already nullified in this view")); + } + + self.create_nullify_message(secret_key) + } + /// Internal helper to create the actual nullify message fn create_nullify_message(&mut self, secret_key: &BlsSecretKey) -> Result { let message = diff --git a/consensus/src/consensus_manager/view_manager.rs b/consensus/src/consensus_manager/view_manager.rs index bf1c451..d888aec 100644 --- a/consensus/src/consensus_manager/view_manager.rs +++ b/consensus/src/consensus_manager/view_manager.rs @@ -383,7 +383,7 @@ use crate::{ ShouldMNotarize, ViewContext, }, }, - crypto::aggregated::{BlsPublicKey, BlsSecretKey, BlsSignature, PeerId}, + crypto::aggregated::{BlsPublicKey, BlsSignature, PeerId}, state::{ block::Block, notarizations::{MNotarization, Vote}, @@ -452,7 +452,7 @@ impl ViewProgressManager ViewProgressManager ViewProgressManager config.genesis_accounts.len(), ); - let mut genesis_context: ViewContext = - ViewContext::new(0, genesis_leader, replica_id, [0; blake3::OUT_LEN]); - genesis_context.block = Some(genesis_block.clone()); - genesis_context.block_hash = Some(genesis_block_hash); - genesis_context.has_voted = true; - genesis_context.m_notarization = Some(genesis_m_notarization); - - // 3. Initialize ViewChain starting at View 1 + // Initialize ViewChain starting at View 1. // The parent of View 1 is the Genesis Hash. let view1_leader = leader_manager.leader_for_view(1)?.peer_id(); @@ -548,8 +532,8 @@ impl ViewProgressManager ViewProgressManager Result { - let current_view = self.view_chain.current_view_mut(); - - if current_view.view_number != 0 { - return Err(anyhow::anyhow!("Can only create genesis vote at view 0")); - } - - if current_view.has_voted { - return Err(anyhow::anyhow!("Already voted for genesis")); - } - - let genesis_hash = current_view - .block_hash - .ok_or_else(|| anyhow::anyhow!("Genesis block not set"))?; - - // Create the vote - let signature = secret_key.sign(&genesis_hash); - - let vote = Vote::new( - 0, // view 0 - genesis_hash, - signature, - self.replica_id, - current_view.leader_id, - ); - - // Mark as voted and add our own vote - current_view.has_voted = true; - current_view.votes.insert(vote.clone()); - - Ok(vote) - } - /// Selects the parent block for a new view according to the Minimmit SelectParent function. /// /// This implements the SelectParent(S, v) function from the Minimmit paper (Section 4): @@ -620,6 +570,17 @@ impl ViewProgressManager Option<(u64, [u8; blake3::OUT_LEN])> { + self.view_chain.oldest_finalizable_view() + } + + /// Finds a view context by view number (read-only). + pub fn find_view_context(&self, view_number: u64) -> Option<&ViewContext> { + self.view_chain.find_view_context(view_number) + } + /// Main driver of the state machine replication algorithm. /// /// Processes received `ConsensusMessage` and emits appropriate `ViewProgressEvent`s @@ -655,7 +616,9 @@ impl ViewProgressManager ViewProgressManager Result<()> { if let Some(ctx) = self.view_chain.find_view_context_mut(view) { ctx.has_nullified = true; - Ok(()) } else { - Err(anyhow::anyhow!( + return Err(anyhow::anyhow!( "Cannot mark nullified for view {} (view not found)", view - )) + )); } + // Remove pending state diff for this view to prevent stale state from + // causing InvalidNonce errors in future transaction validation. + self.view_chain.remove_pending_diff(view); + Ok(()) + } + + /// Computes the leader for a given view using the leader manager directly. + /// + /// Unlike [`leader_for_view`], this does NOT require a view context to exist. + /// Used during cascade nullification when the new view hasn't been created yet. + pub fn compute_leader_for_view(&self, view: u64) -> Result { + Ok(self.leader_manager.leader_for_view(view)?.peer_id()) + } + + /// Progresses to the next view after a cascade nullification. + /// + /// Creates a new view context for `new_view` and advances the view chain. + /// This is used when cascade nullification has locally nullified the current view + /// but no aggregated nullification proof exists yet. + /// + /// # Returns + /// `(leader, parent_hash)` for the new view. + pub fn progress_after_cascade(&mut self, new_view: u64) -> Result<(PeerId, [u8; blake3::OUT_LEN])> { + let new_leader = self.leader_manager.leader_for_view(new_view)?.peer_id(); + let parent_hash = self.view_chain.select_parent(new_view); + let new_view_context = + ViewContext::new(new_view, new_leader, self.replica_id, parent_hash); + self.view_chain.progress_after_cascade(new_view_context)?; + Ok((new_leader, parent_hash)) } /// Processes a block that has already been validated by the validation service. @@ -1185,12 +1176,22 @@ impl ViewProgressManager ViewProgressManager { - // Continue loop - } - + // Prioritize message reception over sleep to avoid delaying peer discovery. + // When both a message and the sleep timer are ready simultaneously, + // biased select picks the first matching branch. res = receivers.sync.recv() => { if let Ok((sender, msg)) = res && let Ok(p2p_msg) = deserialize_message::(&msg) { match p2p_msg { @@ -535,6 +534,10 @@ where } } + _ = context.sleep(recv_timeout) => { + // No messages received within timeout, continue loop + } + // NOTE: We intentionally do NOT listen on consensus or tx channels during bootstrap. // Consuming messages from those channels would drop them, as they can't be re-sent. // The sync channel ping/pong is sufficient for readiness detection. diff --git a/tests/src/e2e_consensus/scenarios.rs b/tests/src/e2e_consensus/scenarios.rs index c795fa8..a16828c 100644 --- a/tests/src/e2e_consensus/scenarios.rs +++ b/tests/src/e2e_consensus/scenarios.rs @@ -476,7 +476,7 @@ fn test_multi_node_happy_path() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, // 20 seconds for tests - more time for peers to connect + bootstrap_timeout_ms: 30_000, // 30 seconds - match production default for reliable bootstrap ping_interval_ms: 200, // Faster ping for quicker discovery ..Default::default() }; @@ -867,7 +867,7 @@ fn test_multi_node_continuous_load() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, + bootstrap_timeout_ms: 30_000, ping_interval_ms: 200, ..Default::default() }; @@ -1131,7 +1131,7 @@ fn test_multi_node_crashed_replica() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, + bootstrap_timeout_ms: 30_000, ping_interval_ms: 200, ..Default::default() }; @@ -1420,7 +1420,7 @@ fn test_multi_node_equivocating_leader() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, + bootstrap_timeout_ms: 30_000, ping_interval_ms: 200, ..Default::default() }); @@ -1770,7 +1770,7 @@ fn test_multi_node_invalid_tx_rejection() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, + bootstrap_timeout_ms: 30_000, ..Default::default() }); } @@ -2008,7 +2008,7 @@ fn test_multi_node_invalid_block_from_leader() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, + bootstrap_timeout_ms: 30_000, ..Default::default() }); } From b53292cddec881221d7bfb7ed10a54ea78b8360e Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Thu, 12 Feb 2026 12:57:18 +0000 Subject: [PATCH 2/6] consensus fixes and block synchronization --- consensus/src/consensus.rs | 7 + consensus/src/consensus_manager/events.rs | 10 + .../src/consensus_manager/state_machine.rs | 15 +- consensus/src/consensus_manager/view_chain.rs | 194 ++++++++++++++---- .../src/consensus_manager/view_manager.rs | 153 +++++++++++++- tests/src/e2e_consensus/scenarios.rs | 2 +- 6 files changed, 337 insertions(+), 44 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 0e1d7b9..891b981 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -15,6 +15,7 @@ use crate::state::{ /// - A M-notarization for a block, for the current view (it can be proposed by any replica) /// - A L-notarization for a block, for the current view (it can be proposed by any replica) /// - A nullification for a view, for the current view (it can be proposed by any replica) +/// - A block recovery request/response for fetching missing blocks from peers #[derive(Clone, Debug, Archive, Deserialize, Serialize)] pub enum ConsensusMessage { BlockProposal(Block), @@ -22,4 +23,10 @@ pub enum ConsensusMessage { Nullify(Nullify), MNotarization(MNotarization), Nullification(Nullification), + /// Request a missing block by view and expected hash. Sent when a replica has M-notarization + /// for a view but never received the actual block proposal from the leader. + BlockRecoveryRequest { view: u64, block_hash: [u8; 32] }, + /// Response containing the requested block. Sent by a peer that has the block in its + /// non-finalized view chain or finalized storage. + BlockRecoveryResponse { view: u64, block: Block }, } diff --git a/consensus/src/consensus_manager/events.rs b/consensus/src/consensus_manager/events.rs index b1078e6..7163379 100644 --- a/consensus/src/consensus_manager/events.rs +++ b/consensus/src/consensus_manager/events.rs @@ -196,4 +196,14 @@ pub enum ViewProgressEvent /// The starting view that triggered the nullification (where conflict was detected) start_view: u64, }, + + /// If the current replica should request a missing block from peers. + /// This happens when a view has received M-notarization (block_hash is known) + /// but the actual block proposal was never received from the leader. + ShouldRequestBlock { + /// The view number for which the block is missing. + view: u64, + /// The expected block hash (from the M-notarization). + block_hash: [u8; blake3::OUT_LEN], + }, } diff --git a/consensus/src/consensus_manager/state_machine.rs b/consensus/src/consensus_manager/state_machine.rs index eb75ef3..de4c28b 100644 --- a/consensus/src/consensus_manager/state_machine.rs +++ b/consensus/src/consensus_manager/state_machine.rs @@ -568,10 +568,13 @@ impl ConsensusStateMachine< // Marking them as nullified would corrupt their state and cause // SelectParent to return inconsistent results across nodes, breaking // chain integrity. Only the current view should be nullified. + // NOTE: We also do NOT remove pending diffs for intermediate views here. + // Those diffs are consistently present across all nodes and removing them + // would create state divergence (this node removes diffs that other nodes + // still have, causing InvalidNonce when this node proposes). // 3. Send a single Nullify for the current view (per paper Algorithm 1, // Step 8). This broadcasts our vote AND marks it locally as nullified. - // nullify_view also calls mark_nullified which removes the pending diff. if let Err(e) = self.nullify_view(current_view, true) { slog::debug!( self.logger, @@ -627,6 +630,16 @@ impl ConsensusStateMachine< Ok(()) } + ViewProgressEvent::ShouldRequestBlock { view, block_hash } => { + slog::info!( + self.logger, + "Requesting missing block from peers"; + "view" => view, + ); + self.broadcast_consensus_message( + ConsensusMessage::BlockRecoveryRequest { view, block_hash }, + ) + } } } diff --git a/consensus/src/consensus_manager/view_chain.rs b/consensus/src/consensus_manager/view_chain.rs index 78f9d32..881b574 100644 --- a/consensus/src/consensus_manager/view_chain.rs +++ b/consensus/src/consensus_manager/view_chain.rs @@ -178,14 +178,16 @@ impl ViewChain std::ops::RangeInclusive { - let current_view = self.current_view; let least_non_finalized_view = self - .current_view - .saturating_sub(self.non_finalized_count() as u64) - + 1; - least_non_finalized_view..=current_view + .non_finalized_views + .keys() + .min() + .copied() + .unwrap_or(self.current_view); + least_non_finalized_view..=self.current_view } /// Returns the range of view numbers for the non-finalized views up to the given view number. @@ -201,10 +203,8 @@ impl ViewChain Option> { - let current_view = self.current_view; - let upper_bound = view_number.min(current_view); - let least_non_finalized_view = - (self.current_view + 1).saturating_sub(self.non_finalized_count() as u64); + let upper_bound = view_number.min(self.current_view); + let least_non_finalized_view = self.non_finalized_views.keys().min().copied()?; if least_non_finalized_view > upper_bound { return None; } @@ -273,6 +273,99 @@ impl ViewChain Result { + let ctx = self + .non_finalized_views + .get_mut(&view) + .ok_or_else(|| anyhow::anyhow!("View {} not in non-finalized views", view))?; + + // Already have a block — no-op + if ctx.block.is_some() { + return Ok(false); + } + + // Skip nullified views — Lemma 5.3 guarantees no L-notarization is possible + if ctx.nullification.is_some() || ctx.has_nullified { + return Ok(false); + } + + // Verify the block hash matches what M-notarization committed to + let expected_hash = ctx.block_hash.ok_or_else(|| { + anyhow::anyhow!("View {} has no block_hash from M-notarization", view) + })?; + + let actual_hash = block.get_hash(); + if actual_hash != expected_hash { + return Err(anyhow::anyhow!( + "Recovered block hash mismatch for view {}: expected {:?}, got {:?}", + view, + expected_hash, + actual_hash + )); + } + + // Store the block + ctx.block = Some(block); + + // Check if L-notarization exists (ready for finalization) + let has_l_notarization = ctx.votes.len() >= N - F; + + Ok(has_l_notarization) + } + + /// Looks up a block by view number, checking both the non-finalized view chain + /// and finalized storage. Returns `None` if the block is not found in either location. + pub fn get_block_for_view(&self, view: u64) -> Option { + // Check non-finalized view chain first + if let Some(ctx) = self.non_finalized_views.get(&view) + && let Some(ref block) = ctx.block + { + return Some(block.clone()); + } + + // Fall back to finalized storage (block may have been GC'd from memory) + self.persistence_writer + .store() + .get_finalized_block_by_height(view) + .ok() + .flatten() + } + + /// Persists a recovered block directly to finalized storage. + /// + /// This is a fallback for the case where a view was already GC'd from `non_finalized_views` + /// before block recovery delivered the block. The view metadata (M-notarization, leader, votes) + /// was already persisted during GC — this method fills in the missing block data. + pub fn persist_recovered_block_to_storage(&self, view: u64, block: Block) -> Result<()> { + // Don't overwrite if we already have block data for this view + if self + .persistence_writer + .store() + .get_finalized_block_by_height(view)? + .is_some() + { + return Ok(()); + } + + for tx in block.transactions.iter() { + self.persistence_writer.put_transaction(tx)?; + } + let mut finalized_block = block; + finalized_block.is_finalized = true; + self.persistence_writer + .put_finalized_block(&finalized_block)?; + Ok(()) + } + /// Stores a pre-computed [`StateDiff`] instance for a view. /// /// This method is called when block validation completes and produces a [`StateDiff`] @@ -431,7 +524,15 @@ impl ViewChain ViewChain ViewChain ViewChain ViewChain = None; for (view_num, ctx) in &self.non_finalized_views { - // Per paper: SelectParent returns the greatest v' < new_view with M-notarization. - // M-notarization is a permanent fact that cannot be invalidated by nullification - // (Lemma 5.3 only forbids L-notarization + nullification coexistence). - // Skip views that are nullified WITHOUT M-notarization (pure nullification). - // Views with both M-notarization and nullification/has_nullified remain valid - // parents to ensure SelectParent consistency across all nodes. - if (ctx.nullification.is_some() || ctx.has_nullified) - && ctx.m_notarization.is_none() - { + // Per paper Algorithm 1: SelectParent returns the greatest M-notarized + // NON-NULLIFIED view v' < new_view. A nullified view — even if M-notarized — + // must be skipped, because its block will never be finalized (Lemma 5.3). + // The pending state diff for nullified views is also removed, so using them + // as parents would create a mismatch between the parent chain and pending state. + if ctx.nullification.is_some() || ctx.has_nullified { continue; } @@ -1444,6 +1557,10 @@ impl ViewChain last request time). + /// Prevents flooding the network with repeated requests for the same missing block. + block_recovery_cooldowns: HashMap, + /// Logger for logging events logger: slog::Logger, } @@ -492,6 +496,7 @@ impl ViewProgressManager ViewProgressManager ViewProgressManager { self.handle_nullification(nullification) } + ConsensusMessage::BlockRecoveryRequest { view, block_hash } => { + self.handle_block_recovery_request(view, block_hash) + } + ConsensusMessage::BlockRecoveryResponse { view, block } => { + self.handle_block_recovery_response(view, block) + } } } @@ -705,6 +717,43 @@ impl ViewProgressManager= current_view.view_number { + continue; + } + + // Candidate: has M-notarization and block_hash but no block data + if view_ctx.m_notarization.is_some() + && view_ctx.block.is_none() + && view_ctx.pending_block.is_none() + && !view_ctx.has_nullified + && view_ctx.nullification.is_none() + && let Some(block_hash) = view_ctx.block_hash + { + // Check cooldown: don't re-request within 500ms + let should_request = self + .block_recovery_cooldowns + .get(&view_number) + .map(|last| last.elapsed() >= std::time::Duration::from_millis(500)) + .unwrap_or(true); + + if should_request { + self.block_recovery_cooldowns + .insert(view_number, Instant::now()); + return Ok(ViewProgressEvent::ShouldRequestBlock { + view: view_number, + block_hash, + }); + } + } + } + // If no block available yet, await if !current_view.has_voted && !current_view.has_nullified && current_view.block.is_none() { return Ok(ViewProgressEvent::Await); @@ -1416,6 +1465,98 @@ impl ViewProgressManager Result> { + // Only the leader for this view should respond to avoid broadcast amplification + let leader = self.leader_manager.leader_for_view(view)?.peer_id(); + if leader != self.replica_id { + return Ok(ViewProgressEvent::NoOp); + } + + // Check both non-finalized view chain and finalized storage + if let Some(block) = self.view_chain.get_block_for_view(view) { + slog::info!( + self.logger, + "Responding to block recovery request (as leader)"; + "view" => view, + ); + return Ok(ViewProgressEvent::BroadcastConsensusMessage { + message: Box::new(ConsensusMessage::BlockRecoveryResponse { + view, + block, + }), + }); + } + + Ok(ViewProgressEvent::NoOp) + } + + /// Handles a block recovery response containing a recovered block from a peer. + /// + /// Validates the block hash against the M-notarization, adds it to the view chain, + /// and triggers finalization if L-notarization (n-f votes) is available. + fn handle_block_recovery_response( + &mut self, + view: u64, + block: Block, + ) -> Result> { + match self.view_chain.add_recovered_block(view, block.clone()) { + Ok(has_l_notarization) => { + // Clear cooldown tracking for this view + self.block_recovery_cooldowns.remove(&view); + + if has_l_notarization { + let block_hash = self + .view_chain + .find_view_context(view) + .and_then(|ctx| ctx.block_hash) + .ok_or_else(|| { + anyhow::anyhow!("Block hash missing after recovery") + })?; + + slog::info!( + self.logger, + "Recovered block triggers finalization"; + "view" => view, + ); + + return Ok(ViewProgressEvent::ShouldFinalize { view, block_hash }); + } + + slog::info!( + self.logger, + "Block recovered, awaiting L-notarization"; + "view" => view, + ); + + Ok(ViewProgressEvent::NoOp) + } + Err(e) => { + // View was likely GC'd before the recovery response arrived. + // Persist the block directly to storage as a fallback — the view + // metadata was already persisted during GC. + slog::info!( + self.logger, + "Block recovery: view not in chain, persisting to storage"; + "view" => view, + "reason" => %e, + ); + self.view_chain + .persist_recovered_block_to_storage(view, block)?; + self.block_recovery_cooldowns.remove(&view); + Ok(ViewProgressEvent::NoOp) + } + } + } + /// Process pending child blocks recursively until no more can be processed. /// This handles cascading scenarios where processing a pending block causes it to reach /// M-notarization, which then allows its own pending children to be processed. @@ -6700,12 +6841,14 @@ mod tests { // Mark view 1 as nullified (simulating cascade) manager.mark_nullified(1).unwrap(); - // Per paper: M-notarization is a permanent fact, not invalidated by local nullification. - // View 1 still has M-notarization, so select_parent should still return its block hash. + // Per paper: SelectParent returns the greatest M-notarized NON-NULLIFIED view. + // View 1 is now nullified via mark_nullified, so it should be skipped. + // Should fall back to genesis hash. let parent_after = manager.select_parent(3); assert_eq!( - parent_after, block_hash_v1, - "M-notarized view should remain a valid parent even after mark_nullified" + parent_after, + Block::genesis_hash(), + "Nullified view should not be used as parent, should fall back to genesis" ); std::fs::remove_file(path).unwrap(); diff --git a/tests/src/e2e_consensus/scenarios.rs b/tests/src/e2e_consensus/scenarios.rs index a16828c..7f79875 100644 --- a/tests/src/e2e_consensus/scenarios.rs +++ b/tests/src/e2e_consensus/scenarios.rs @@ -476,7 +476,7 @@ fn test_multi_node_happy_path() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, // 30 seconds - match production default for reliable bootstrap + bootstrap_timeout_ms: 30_000, ping_interval_ms: 200, // Faster ping for quicker discovery ..Default::default() }; From 472da803aae0267a37d211cc17286930b9b9f2a9 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Thu, 12 Feb 2026 13:10:21 +0000 Subject: [PATCH 3/6] changes --- tests/src/e2e_consensus/scenarios.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/src/e2e_consensus/scenarios.rs b/tests/src/e2e_consensus/scenarios.rs index 7f79875..d080669 100644 --- a/tests/src/e2e_consensus/scenarios.rs +++ b/tests/src/e2e_consensus/scenarios.rs @@ -476,8 +476,8 @@ fn test_multi_node_happy_path() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, - ping_interval_ms: 200, // Faster ping for quicker discovery + bootstrap_timeout_ms: 20_000, + ping_interval_ms: 200, // Faster ping for quicker discovery ..Default::default() }; @@ -867,7 +867,7 @@ fn test_multi_node_continuous_load() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, + bootstrap_timeout_ms: 20_000, ping_interval_ms: 200, ..Default::default() }; @@ -1131,7 +1131,7 @@ fn test_multi_node_crashed_replica() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, + bootstrap_timeout_ms: 20_000, ping_interval_ms: 200, ..Default::default() }; @@ -1420,7 +1420,7 @@ fn test_multi_node_equivocating_leader() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, + bootstrap_timeout_ms: 20_000, ping_interval_ms: 200, ..Default::default() }); @@ -1770,7 +1770,7 @@ fn test_multi_node_invalid_tx_rejection() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, + bootstrap_timeout_ms: 20_000, ..Default::default() }); } @@ -2008,7 +2008,7 @@ fn test_multi_node_invalid_block_from_leader() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 30_000, + bootstrap_timeout_ms: 20_000, ..Default::default() }); } From 2e3633329ba87f3cebffacdcc7d8140ba217c900 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Thu, 12 Feb 2026 19:18:41 +0000 Subject: [PATCH 4/6] further chain integrity fixes --- consensus/src/consensus_manager/events.rs | 8 + .../src/consensus_manager/state_machine.rs | 20 ++ consensus/src/consensus_manager/view_chain.rs | 195 +++++++++--------- .../src/consensus_manager/view_manager.rs | 32 ++- 4 files changed, 144 insertions(+), 111 deletions(-) diff --git a/consensus/src/consensus_manager/events.rs b/consensus/src/consensus_manager/events.rs index 7163379..301a26e 100644 --- a/consensus/src/consensus_manager/events.rs +++ b/consensus/src/consensus_manager/events.rs @@ -206,4 +206,12 @@ pub enum ViewProgressEvent /// The expected block hash (from the M-notarization). block_hash: [u8; blake3::OUT_LEN], }, + + /// If the current replica should request multiple missing blocks from peers. + /// This is the batch version of `ShouldRequestBlock`, used when multiple views + /// need block recovery simultaneously (e.g., after a node joins late). + ShouldRequestBlocks { + /// The list of (view, block_hash) pairs for which blocks are missing. + requests: Vec<(u64, [u8; blake3::OUT_LEN])>, + }, } diff --git a/consensus/src/consensus_manager/state_machine.rs b/consensus/src/consensus_manager/state_machine.rs index de4c28b..96dd761 100644 --- a/consensus/src/consensus_manager/state_machine.rs +++ b/consensus/src/consensus_manager/state_machine.rs @@ -640,6 +640,26 @@ impl ConsensusStateMachine< ConsensusMessage::BlockRecoveryRequest { view, block_hash }, ) } + ViewProgressEvent::ShouldRequestBlocks { requests } => { + slog::info!( + self.logger, + "Requesting {} missing blocks from peers", + requests.len(), + ); + for (view, block_hash) in requests { + if let Err(e) = self.broadcast_consensus_message( + ConsensusMessage::BlockRecoveryRequest { view, block_hash }, + ) { + slog::warn!( + self.logger, + "Failed to request block for view {}: {}", + view, + e + ); + } + } + Ok(()) + } } } diff --git a/consensus/src/consensus_manager/view_chain.rs b/consensus/src/consensus_manager/view_chain.rs index 881b574..ce6feb5 100644 --- a/consensus/src/consensus_manager/view_chain.rs +++ b/consensus/src/consensus_manager/view_chain.rs @@ -247,8 +247,9 @@ impl ViewChain { let parent_ctx = self.non_finalized_views.get(&parent_view_number).unwrap(); - // Parent must have M-notarization and not be nullified + // Parent must have M-notarization, block, and not be nullified parent_ctx.m_notarization.is_some() + && parent_ctx.block.is_some() && parent_ctx.nullification.is_none() && !parent_ctx.has_nullified // All intermediate views must be nullified @@ -340,32 +341,6 @@ impl ViewChain Result<()> { - // Don't overwrite if we already have block data for this view - if self - .persistence_writer - .store() - .get_finalized_block_by_height(view)? - .is_some() - { - return Ok(()); - } - - for tx in block.transactions.iter() { - self.persistence_writer.put_transaction(tx)?; - } - let mut finalized_block = block; - finalized_block.is_finalized = true; - self.persistence_writer - .put_finalized_block(&finalized_block)?; - Ok(()) - } - /// Stores a pre-computed [`StateDiff`] instance for a view. /// /// This method is called when block validation completes and produces a [`StateDiff`] @@ -992,6 +967,12 @@ impl ViewChain= finalized_view { return Err(anyhow::anyhow!( "Parent view {} is more recent than the finalized view {}, cannot finalize view {}", @@ -1015,9 +996,39 @@ impl ViewChain ViewChain ViewChain= N - F && ctx.block.is_some() { - self.persist_l_notarized_view(&ctx, peers)?; - } else { - // M-notarized (or L-notarized without block) — persist available data - self.persist_m_notarized_view(&ctx, peers)?; - } - } else if view_number > parent_view_number { - // Intermediate view - must be nullified (already validated above) - self.persist_nullified_view(&ctx, peers)?; - } else { - // View before parent - could be part of an earlier chain. - // M-notarized views are persisted as finalized even if also nullified, - // because their blocks may be referenced as parents in the chain. - // Per Lemma 5.3: M-notarization + nullification can coexist. - if ctx.m_notarization.is_some() && ctx.block.is_some() { - if ctx.votes.len() >= N - F { - if let Some(bh) = ctx.block_hash { - self.previously_committed_block_hash = bh; - } - self.persist_l_notarized_view(&ctx, peers)?; - } else { - self.persist_m_notarized_view(&ctx, peers)?; - } - } else if ctx.nullification.is_some() || ctx.has_nullified { - self.persist_nullified_view(&ctx, peers)?; - } else if ctx.votes.len() >= N - F && ctx.block.is_some() { - if let Some(bh) = ctx.block_hash { - self.previously_committed_block_hash = bh; - } - self.persist_l_notarized_view(&ctx, peers)?; - } else { - // M-notarized without block, or no consensus artifacts - if ctx.m_notarization.is_some() { - self.persist_m_notarized_view(&ctx, peers)?; - } else { - self.persist_nullified_view(&ctx, peers)?; - } - } + } else if canonical_views.contains(&view_number) { + // Canonical chain view — persist as finalized + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; } - } else { - // Parent not in non-finalized views (already finalized). - // Same priority: M-notarized views with blocks are persisted as finalized. - if ctx.m_notarization.is_some() && ctx.block.is_some() { - if ctx.votes.len() >= N - F { - if let Some(bh) = ctx.block_hash { - self.previously_committed_block_hash = bh; - } - self.persist_l_notarized_view(&ctx, peers)?; - } else { - self.persist_m_notarized_view(&ctx, peers)?; - } - } else if ctx.nullification.is_some() || ctx.has_nullified { - self.persist_nullified_view(&ctx, peers)?; - } else if ctx.votes.len() >= N - F && ctx.block.is_some() { - if let Some(bh) = ctx.block_hash { - self.previously_committed_block_hash = bh; - } + if ctx.votes.len() >= N - F && ctx.block.is_some() { self.persist_l_notarized_view(&ctx, peers)?; - } else if ctx.m_notarization.is_some() { - self.persist_m_notarized_view(&ctx, peers)?; } else { - // View with no consensus artifacts (e.g., node never received any messages - // for this view). Persist as nullified to clear it from the chain. - self.persist_nullified_view(&ctx, peers)?; + self.persist_m_notarized_view(&ctx, peers)?; } + } else { + // Non-canonical view (dead fork or intermediate nullified view). + // Persist metadata but NOT as a finalized block. + self.persist_nullified_view_or_metadata(&ctx, peers)?; } } @@ -1322,6 +1277,46 @@ impl ViewChain, + peers: &PeerSet, + ) -> Result<()> { + // If the view has nullification artifacts, use the standard path + if ctx.nullification.is_some() || ctx.has_nullified { + return self.persist_nullified_view(ctx, peers); + } + + // Non-canonical view without nullification — persist metadata only + let view_number = ctx.view_number; + self.persistence_writer.remove_nullified_view(view_number); + + if let Some(ref block) = ctx.block { + self.persistence_writer.put_nullified_block(block)?; + } + if let Some(ref m_notarization) = ctx.m_notarization { + self.persistence_writer.put_m_notarization(m_notarization)?; + } + + let leader = Leader::new(ctx.leader_id, view_number); + self.persistence_writer.put_leader(&leader)?; + + let leader_pk = peers.id_to_public_key.get(&ctx.leader_id).unwrap(); + let view = View::new(view_number, leader_pk.clone(), false, true); + self.persistence_writer.put_view(&view)?; + + for vote in ctx.votes.iter() { + self.persistence_writer.put_vote(vote)?; + } + + Ok(()) + } + /// Persists a nullified view to storage as a failed consensus attempt. /// /// Nullified views represent failed consensus where the replica network collected 2f+1 @@ -2366,7 +2361,7 @@ mod tests { let setup = TestSetup::new(N); let leader_id = setup.leader_id(0); let replica_id = setup.replica_id(1); - let parent_hash = [15u8; blake3::OUT_LEN]; + let parent_hash = Block::genesis_hash(); // Create view 1 with M-notarization let mut ctx_v1 = create_view_context_with_votes( @@ -2753,7 +2748,7 @@ mod tests { let setup = TestSetup::new(N); let leader_id = setup.leader_id(0); let replica_id = setup.replica_id(1); - let parent_hash = [21u8; blake3::OUT_LEN]; + let parent_hash = Block::genesis_hash(); let mut ctx_v1 = create_view_context_with_votes( 1, diff --git a/consensus/src/consensus_manager/view_manager.rs b/consensus/src/consensus_manager/view_manager.rs index 7787525..fcdedb5 100644 --- a/consensus/src/consensus_manager/view_manager.rs +++ b/consensus/src/consensus_manager/view_manager.rs @@ -718,7 +718,14 @@ impl ViewProgressManager= MAX_BATCH_RECOVERY { + break; + } + let Some(view_ctx) = self.view_chain.find_view_context(view_number) else { continue; }; @@ -746,14 +753,20 @@ impl ViewProgressManager 1 { + return Ok(ViewProgressEvent::ShouldRequestBlocks { + requests: recovery_requests, + }); + } + // If no block available yet, await if !current_view.has_voted && !current_view.has_nullified && current_view.block.is_none() { return Ok(ViewProgressEvent::Await); @@ -1540,17 +1553,14 @@ impl ViewProgressManager { - // View was likely GC'd before the recovery response arrived. - // Persist the block directly to storage as a fallback — the view - // metadata was already persisted during GC. - slog::info!( + // View was GC'd — its chain position is already determined. + // Don't persist the block to finalized store as it bypasses chain validation. + slog::debug!( self.logger, - "Block recovery: view not in chain, persisting to storage"; + "Block recovery: view GC'd, dropping recovered block"; "view" => view, "reason" => %e, ); - self.view_chain - .persist_recovered_block_to_storage(view, block)?; self.block_recovery_cooldowns.remove(&view); Ok(ViewProgressEvent::NoOp) } From 018150b83d6b78f70df130b2375b965387f06d34 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Fri, 13 Feb 2026 15:30:57 +0000 Subject: [PATCH 5/6] fixes for f+1 block sync --- .../src/consensus_manager/state_machine.rs | 36 ++++- consensus/src/consensus_manager/view_chain.rs | 99 ++++++++++--- .../src/consensus_manager/view_manager.rs | 135 ++++++++++++++++-- 3 files changed, 231 insertions(+), 39 deletions(-) diff --git a/consensus/src/consensus_manager/state_machine.rs b/consensus/src/consensus_manager/state_machine.rs index 96dd761..1b718fa 100644 --- a/consensus/src/consensus_manager/state_machine.rs +++ b/consensus/src/consensus_manager/state_machine.rs @@ -323,6 +323,10 @@ impl ConsensusStateMachine< if let Err(e) = self.handle_consensus_message(message) { slog::warn!(self.logger, "Error handling consensus message: {}", e); } + // Check shutdown between messages to allow timely exit + if self.shutdown_signal.load(Ordering::Relaxed) { + break; + } } // Periodic tick @@ -568,10 +572,6 @@ impl ConsensusStateMachine< // Marking them as nullified would corrupt their state and cause // SelectParent to return inconsistent results across nodes, breaking // chain integrity. Only the current view should be nullified. - // NOTE: We also do NOT remove pending diffs for intermediate views here. - // Those diffs are consistently present across all nodes and removing them - // would create state divergence (this node removes diffs that other nodes - // still have, causing InvalidNonce when this node proposes). // 3. Send a single Nullify for the current view (per paper Algorithm 1, // Step 8). This broadcasts our vote AND marks it locally as nullified. @@ -584,7 +584,15 @@ impl ConsensusStateMachine< ); } - // 4. Create a new view context and progress to it. + // 4. Remove stale pending state diffs from start_view onward. + // Intermediate M-notarized views may have StateDiffs with nonce + // increments that are no longer valid after the cascade. All correct + // nodes will eventually cascade from the same start_view (2f+1 + // guarantee), so this converges to consistent pending state. + self.view_manager + .rollback_pending_diffs_in_range(start_view, current_view); + + // 5. Create a new view context and progress to it. // Unlike normal nullification flow (which requires aggregated proof), // cascade progression only requires local nullification of current view. let new_view = current_view + 1; @@ -598,7 +606,7 @@ impl ConsensusStateMachine< parent_hash ); - // 5. Replay buffered messages and propose if leader + // 6. Replay buffered messages and propose if leader self.progress_to_next_view(new_view, leader, parent_hash)?; Ok(()) @@ -628,6 +636,11 @@ impl ConsensusStateMachine< ); } + // Remove stale pending state diffs from start_view onward + // (same rationale as ShouldCascadeNullification above). + self.view_manager + .rollback_pending_diffs_in_range(start_view, current_view); + Ok(()) } ViewProgressEvent::ShouldRequestBlock { view, block_hash } => { @@ -961,9 +974,19 @@ impl ConsensusStateMachine< // Try to finalize any pending views that now have enough votes. // This handles the case where votes accumulated during previous views // but finalization was deferred (e.g., missing block, missing parent M-notarization). + // + // Limit per pass to avoid blocking the main event loop for too long — each + // finalization involves storage writes and can take ~30ms. Processing hundreds + // of views here would starve message processing (block recovery requests/responses), + // causing other nodes to wait. Remaining views are finalized on the next call. + const MAX_FINALIZATIONS_PER_PASS: usize = 5; + let mut finalization_count = 0; while let Some((finalizable_view, block_hash)) = self.view_manager.oldest_finalizable_view() { + if finalization_count >= MAX_FINALIZATIONS_PER_PASS { + break; + } slog::info!( self.logger, "Attempting deferred finalization for view {} (block: {:?})", @@ -985,6 +1008,7 @@ impl ConsensusStateMachine< if self.view_manager.find_view_context(finalizable_view).is_some() { break; } + finalization_count += 1; } // Replay any buffered messages for this view (or previous ones) diff --git a/consensus/src/consensus_manager/view_chain.rs b/consensus/src/consensus_manager/view_chain.rs index ce6feb5..0ff20c0 100644 --- a/consensus/src/consensus_manager/view_chain.rs +++ b/consensus/src/consensus_manager/view_chain.rs @@ -96,6 +96,11 @@ pub struct ViewChain { /// The most recent finalized block hash in the current replica's state machine // TODO: Move this to [`ViewProgressManager`] pub(crate) previously_committed_block_hash: [u8; blake3::OUT_LEN], + + /// Views in the canonical chain (ancestor of an L-notarized block) that are missing + /// block data. Populated by `finalize_with_l_notarization` when deferring due to + /// missing ancestor blocks. Drained by `tick()` to trigger proactive recovery. + pub(crate) pending_canonical_recovery: Vec<(u64, [u8; blake3::OUT_LEN])>, } impl ViewChain { @@ -121,6 +126,7 @@ impl ViewChain ViewChain { let parent_ctx = self.non_finalized_views.get(&parent_view_number).unwrap(); - // Parent must have M-notarization, block, and not be nullified + // Parent must have M-notarization and block. Parent CAN be nullified — + // M-notarization + nullification coexist (n >= 5f+1). This matches + // finalize_with_l_notarization which does not check parent nullification. parent_ctx.m_notarization.is_some() && parent_ctx.block.is_some() - && parent_ctx.nullification.is_none() - && !parent_ctx.has_nullified // All intermediate views must be nullified && (parent_view_number + 1..view_num).all(|inter| { self.non_finalized_views @@ -294,11 +300,6 @@ impl ViewChain ViewChain= N - F; + // Per Lemma 5.3, nullified views cannot achieve L-notarization. + // Don't trigger direct finalization — the block will be used during + // ancestor GC when an L-notarized descendant finalizes. + let has_l_notarization = + ctx.votes.len() >= N - F && ctx.nullification.is_none() && !ctx.has_nullified; Ok(has_l_notarization) } @@ -341,6 +346,41 @@ impl ViewChain Option { + // Check non-finalized view chain first + if let Some(ctx) = self.non_finalized_views.get(&view) + && let Some(ref block) = ctx.block + { + return Some(block.clone()); + } + + let store = self.persistence_writer.store(); + + // Check finalized storage + if let Some(block) = store.get_finalized_block_by_height(view).ok().flatten() { + return Some(block); + } + + // Check nullified storage (block may have been GC'd as non-canonical) + if let Some(block) = store.get_nullified_block(block_hash).ok().flatten() { + return Some(block); + } + + // Check non-finalized storage + store.get_non_finalized_block(block_hash).ok().flatten() + } + /// Stores a pre-computed [`StateDiff`] instance for a view. /// /// This method is called when block validation completes and produces a [`StateDiff`] @@ -1010,6 +1050,7 @@ impl ViewChain ViewChain ViewChain ViewChain ViewChain Option { + pub(crate) fn find_parent_view(&self, parent_hash: &[u8; blake3::OUT_LEN]) -> Option { for (view_num, ctx) in &self.non_finalized_views { if let Some(hash) = ctx.block_hash && hash == *parent_hash diff --git a/consensus/src/consensus_manager/view_manager.rs b/consensus/src/consensus_manager/view_manager.rs index fcdedb5..a855186 100644 --- a/consensus/src/consensus_manager/view_manager.rs +++ b/consensus/src/consensus_manager/view_manager.rs @@ -436,6 +436,11 @@ pub struct ViewProgressManager, + /// Persistent set of canonical views needing block recovery for deferred finalization. + /// Entries persist across ticks until the block is recovered or the view is GC'd. + /// Populated after `finalize_with_l_notarization` defers; drained when blocks arrive. + canonical_recovery_pending: Vec<(u64, [u8; blake3::OUT_LEN])>, + /// Logger for logging events logger: slog::Logger, } @@ -497,6 +502,7 @@ impl ViewProgressManager ViewProgressManager ViewProgressManager Result> { + // Clean up canonical recovery entries: remove views that have been recovered or GC'd. + self.canonical_recovery_pending.retain(|(view, _)| { + self.view_chain + .find_view_context(*view) + .map(|ctx| ctx.block.is_none()) // Keep if block still missing + .unwrap_or(false) // Remove if view GC'd (finalized or dropped) + }); let current_view = self.view_chain.current(); let view_range = self.view_chain.non_finalized_view_numbers_range(); @@ -647,10 +661,36 @@ impl ViewProgressManager ViewProgressManager= MAX_BATCH_RECOVERY { + break; + } + // Avoid duplicates with regular recovery + if recovery_requests.iter().any(|(v, _)| *v == view) { + continue; + } + let should_request = self + .block_recovery_cooldowns + .get(&view) + .map(|last| last.elapsed() >= std::time::Duration::from_millis(500)) + .unwrap_or(true); + if should_request { + self.block_recovery_cooldowns + .insert(view, Instant::now()); + recovery_requests.push((view, block_hash)); + } + } + if recovery_requests.len() == 1 { let (view, block_hash) = recovery_requests[0]; return Ok(ViewProgressEvent::ShouldRequestBlock { view, block_hash }); @@ -812,7 +874,19 @@ impl ViewProgressManager Result<()> { self.view_chain - .finalize_with_l_notarization(view, &self.peers) + .finalize_with_l_notarization(view, &self.peers)?; + + // Propagate any canonical recovery requests to the persistent tracking set. + // ViewChain populates pending_canonical_recovery during finalization when + // canonical ancestors are missing blocks. We move them here so they persist + // across ticks until the blocks actually arrive. + for entry in self.view_chain.pending_canonical_recovery.drain(..) { + if !self.canonical_recovery_pending.iter().any(|(v, _)| *v == entry.0) { + self.canonical_recovery_pending.push(entry); + } + } + + Ok(()) } /// Marks that the current replica has proposed a block for a view. @@ -858,6 +932,15 @@ impl ViewProgressManager ViewProgressManager Result> { - // Only the leader for this view should respond to avoid broadcast amplification - let leader = self.leader_manager.leader_for_view(view)?.peer_id(); - if leader != self.replica_id { + // Leader + F backups = F+1 responders. With at most F faulty nodes, + // at least one honest node is guaranteed to respond. + let is_responder = (0..=F).any(|offset| { + self.leader_manager + .leader_for_view(view + offset as u64) + .map(|l| l.peer_id() == self.replica_id) + .unwrap_or(false) + }); + if !is_responder { return Ok(ViewProgressEvent::NoOp); } - // Check both non-finalized view chain and finalized storage - if let Some(block) = self.view_chain.get_block_for_view(view) { + // Search all storage locations for the block + if let Some(block) = self.view_chain.get_block_for_recovery(view, &block_hash) { slog::info!( self.logger, - "Responding to block recovery request (as leader)"; + "Responding to block recovery request"; "view" => view, ); return Ok(ViewProgressEvent::BroadcastConsensusMessage { @@ -1550,6 +1644,17 @@ impl ViewProgressManager view, ); + // Check if this block recovery unblocks a deferred finalization + // for an L-notarized descendant. + if let Some((finalizable_view, block_hash)) = + self.view_chain.oldest_finalizable_view() + { + return Ok(ViewProgressEvent::ShouldFinalize { + view: finalizable_view, + block_hash, + }); + } + Ok(ViewProgressEvent::NoOp) } Err(e) => { From bd74f70ba28d79f1e85ac8a8ccf905bf45647c68 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Thu, 19 Feb 2026 11:50:37 +0000 Subject: [PATCH 6/6] fixes --- Cargo.lock | 22 +++--- consensus/src/consensus.rs | 10 ++- .../src/consensus_manager/state_machine.rs | 72 ++++++++++--------- consensus/src/consensus_manager/view_chain.rs | 11 ++- .../src/consensus_manager/view_manager.rs | 38 +++++----- 5 files changed, 81 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2690d7..be70308 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2043,9 +2043,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ "cpufeatures", ] @@ -2243,9 +2243,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-integer" @@ -3644,30 +3644,30 @@ dependencies = [ [[package]] name = "time" -version = "0.3.44" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.24" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 891b981..8d1a745 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -25,8 +25,14 @@ pub enum ConsensusMessage { Nullification(Nullification), /// Request a missing block by view and expected hash. Sent when a replica has M-notarization /// for a view but never received the actual block proposal from the leader. - BlockRecoveryRequest { view: u64, block_hash: [u8; 32] }, + BlockRecoveryRequest { + view: u64, + block_hash: [u8; 32], + }, /// Response containing the requested block. Sent by a peer that has the block in its /// non-finalized view chain or finalized storage. - BlockRecoveryResponse { view: u64, block: Block }, + BlockRecoveryResponse { + view: u64, + block: Block, + }, } diff --git a/consensus/src/consensus_manager/state_machine.rs b/consensus/src/consensus_manager/state_machine.rs index 1b718fa..c6c388f 100644 --- a/consensus/src/consensus_manager/state_machine.rs +++ b/consensus/src/consensus_manager/state_machine.rs @@ -552,9 +552,9 @@ impl ConsensusStateMachine< current_view ); - // 1. Broadcast the aggregated nullification for the view that triggered - // the cascade (if needed). This lets other nodes learn about the - // nullification and independently handle their own cascades. + // 1. Broadcast the aggregated nullification for the view that triggered the cascade + // (if needed). This lets other nodes learn about the nullification and + // independently handle their own cascades. if should_broadcast_nullification && let Err(e) = self.broadcast_nullification(start_view) { @@ -566,15 +566,15 @@ impl ConsensusStateMachine< ); } - // 2. Per paper Algorithm 1 Step 8: do NOT nullify intermediate views. - // Intermediate views between start_view and current_view were already - // processed through normal consensus flow (M-notarized or nullified). - // Marking them as nullified would corrupt their state and cause - // SelectParent to return inconsistent results across nodes, breaking - // chain integrity. Only the current view should be nullified. + // 2. Per paper Algorithm 1 Step 8: do NOT nullify intermediate views. Intermediate + // views between start_view and current_view were already processed through + // normal consensus flow (M-notarized or nullified). Marking them as nullified + // would corrupt their state and cause SelectParent to return inconsistent + // results across nodes, breaking chain integrity. Only the current view should + // be nullified. - // 3. Send a single Nullify for the current view (per paper Algorithm 1, - // Step 8). This broadcasts our vote AND marks it locally as nullified. + // 3. Send a single Nullify for the current view (per paper Algorithm 1, Step 8). + // This broadcasts our vote AND marks it locally as nullified. if let Err(e) = self.nullify_view(current_view, true) { slog::debug!( self.logger, @@ -584,20 +584,19 @@ impl ConsensusStateMachine< ); } - // 4. Remove stale pending state diffs from start_view onward. - // Intermediate M-notarized views may have StateDiffs with nonce - // increments that are no longer valid after the cascade. All correct - // nodes will eventually cascade from the same start_view (2f+1 - // guarantee), so this converges to consistent pending state. + // 4. Remove stale pending state diffs from start_view onward. Intermediate + // M-notarized views may have StateDiffs with nonce increments that are no longer + // valid after the cascade. All correct nodes will eventually cascade from the + // same start_view (2f+1 guarantee), so this converges to consistent pending + // state. self.view_manager .rollback_pending_diffs_in_range(start_view, current_view); - // 5. Create a new view context and progress to it. - // Unlike normal nullification flow (which requires aggregated proof), - // cascade progression only requires local nullification of current view. + // 5. Create a new view context and progress to it. Unlike normal nullification flow + // (which requires aggregated proof), cascade progression only requires local + // nullification of current view. let new_view = current_view + 1; - let (leader, parent_hash) = - self.view_manager.progress_after_cascade(new_view)?; + let (leader, parent_hash) = self.view_manager.progress_after_cascade(new_view)?; slog::info!( self.logger, @@ -649,9 +648,10 @@ impl ConsensusStateMachine< "Requesting missing block from peers"; "view" => view, ); - self.broadcast_consensus_message( - ConsensusMessage::BlockRecoveryRequest { view, block_hash }, - ) + self.broadcast_consensus_message(ConsensusMessage::BlockRecoveryRequest { + view, + block_hash, + }) } ViewProgressEvent::ShouldRequestBlocks { requests } => { slog::info!( @@ -660,9 +660,12 @@ impl ConsensusStateMachine< requests.len(), ); for (view, block_hash) in requests { - if let Err(e) = self.broadcast_consensus_message( - ConsensusMessage::BlockRecoveryRequest { view, block_hash }, - ) { + if let Err(e) = + self.broadcast_consensus_message(ConsensusMessage::BlockRecoveryRequest { + view, + block_hash, + }) + { slog::warn!( self.logger, "Failed to request block for view {}: {}", @@ -887,9 +890,9 @@ impl ConsensusStateMachine< /// /// # Arguments /// * `view` - The view number to nullify - /// * `force` - If true, use cascade nullification (bypasses has_voted/evidence checks). - /// Used for ShouldCascadeNullification and ShouldNullifyRange events. - /// If false, use normal nullification logic (timeout or Byzantine based on evidence). + /// * `force` - If true, use cascade nullification (bypasses has_voted/evidence checks). Used + /// for ShouldCascadeNullification and ShouldNullifyRange events. If false, use normal + /// nullification logic (timeout or Byzantine based on evidence). fn nullify_view(&mut self, view: u64, force: bool) -> Result<()> { slog::debug!(self.logger, "Nullifying view {view} (force: {force})"); @@ -981,8 +984,7 @@ impl ConsensusStateMachine< // causing other nodes to wait. Remaining views are finalized on the next call. const MAX_FINALIZATIONS_PER_PASS: usize = 5; let mut finalization_count = 0; - while let Some((finalizable_view, block_hash)) = - self.view_manager.oldest_finalizable_view() + while let Some((finalizable_view, block_hash)) = self.view_manager.oldest_finalizable_view() { if finalization_count >= MAX_FINALIZATIONS_PER_PASS { break; @@ -1005,7 +1007,11 @@ impl ConsensusStateMachine< // If the view is still present after finalize_view returned Ok(()), // it means finalization was deferred (e.g., ancestor missing block). // Break to avoid an infinite loop — we'll retry on the next view progression. - if self.view_manager.find_view_context(finalizable_view).is_some() { + if self + .view_manager + .find_view_context(finalizable_view) + .is_some() + { break; } finalization_count += 1; diff --git a/consensus/src/consensus_manager/view_chain.rs b/consensus/src/consensus_manager/view_chain.rs index 0ff20c0..38050b6 100644 --- a/consensus/src/consensus_manager/view_chain.rs +++ b/consensus/src/consensus_manager/view_chain.rs @@ -1235,8 +1235,7 @@ impl ViewChain ViewProgressManager ViewProgressManager ViewProgressManager= std::time::Duration::from_millis(500)) .unwrap_or(true); if should_request { - self.block_recovery_cooldowns - .insert(view, Instant::now()); + self.block_recovery_cooldowns.insert(view, Instant::now()); recovery_requests.push((view, block_hash)); } } @@ -881,7 +879,11 @@ impl ViewProgressManager ViewProgressManager Result<(PeerId, [u8; blake3::OUT_LEN])> { + pub fn progress_after_cascade( + &mut self, + new_view: u64, + ) -> Result<(PeerId, [u8; blake3::OUT_LEN])> { let new_leader = self.leader_manager.leader_for_view(new_view)?.peer_id(); let parent_hash = self.view_chain.select_parent(new_view); - let new_view_context = - ViewContext::new(new_view, new_leader, self.replica_id, parent_hash); + let new_view_context = ViewContext::new(new_view, new_leader, self.replica_id, parent_hash); self.view_chain.progress_after_cascade(new_view_context)?; Ok((new_leader, parent_hash)) } @@ -1324,7 +1328,8 @@ impl ViewProgressManager ViewProgressManager view, ); return Ok(ViewProgressEvent::BroadcastConsensusMessage { - message: Box::new(ConsensusMessage::BlockRecoveryResponse { - view, - block, - }), + message: Box::new(ConsensusMessage::BlockRecoveryResponse { view, block }), }); } @@ -1625,9 +1627,7 @@ impl ViewProgressManager