diff --git a/Cargo.lock b/Cargo.lock index f2690d7..be70308 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2043,9 +2043,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ "cpufeatures", ] @@ -2243,9 +2243,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-integer" @@ -3644,30 +3644,30 @@ dependencies = [ [[package]] name = "time" -version = "0.3.44" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.24" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 0e1d7b9..8d1a745 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -15,6 +15,7 @@ use crate::state::{ /// - A M-notarization for a block, for the current view (it can be proposed by any replica) /// - A L-notarization for a block, for the current view (it can be proposed by any replica) /// - A nullification for a view, for the current view (it can be proposed by any replica) +/// - A block recovery request/response for fetching missing blocks from peers #[derive(Clone, Debug, Archive, Deserialize, Serialize)] pub enum ConsensusMessage { BlockProposal(Block), @@ -22,4 +23,16 @@ pub enum ConsensusMessage { Nullify(Nullify), MNotarization(MNotarization), Nullification(Nullification), + /// Request a missing block by view and expected hash. Sent when a replica has M-notarization + /// for a view but never received the actual block proposal from the leader. + BlockRecoveryRequest { + view: u64, + block_hash: [u8; 32], + }, + /// Response containing the requested block. Sent by a peer that has the block in its + /// non-finalized view chain or finalized storage. + BlockRecoveryResponse { + view: u64, + block: Block, + }, } diff --git a/consensus/src/consensus_manager/events.rs b/consensus/src/consensus_manager/events.rs index b1078e6..301a26e 100644 --- a/consensus/src/consensus_manager/events.rs +++ b/consensus/src/consensus_manager/events.rs @@ -196,4 +196,22 @@ pub enum ViewProgressEvent /// The starting view that triggered the nullification (where conflict was detected) start_view: u64, }, + + /// If the current replica should request a missing block from peers. + /// This happens when a view has received M-notarization (block_hash is known) + /// but the actual block proposal was never received from the leader. + ShouldRequestBlock { + /// The view number for which the block is missing. + view: u64, + /// The expected block hash (from the M-notarization). + block_hash: [u8; blake3::OUT_LEN], + }, + + /// If the current replica should request multiple missing blocks from peers. + /// This is the batch version of `ShouldRequestBlock`, used when multiple views + /// need block recovery simultaneously (e.g., after a node joins late). + ShouldRequestBlocks { + /// The list of (view, block_hash) pairs for which blocks are missing. + requests: Vec<(u64, [u8; blake3::OUT_LEN])>, + }, } diff --git a/consensus/src/consensus_manager/state_machine.rs b/consensus/src/consensus_manager/state_machine.rs index 7503d90..c6c388f 100644 --- a/consensus/src/consensus_manager/state_machine.rs +++ b/consensus/src/consensus_manager/state_machine.rs @@ -321,7 +321,11 @@ impl ConsensusStateMachine< while let Ok(message) = self.message_consumer.pop() { did_work = true; if let Err(e) = self.handle_consensus_message(message) { - slog::error!(self.logger, "Error handling consensus message: {}", e); + slog::warn!(self.logger, "Error handling consensus message: {}", e); + } + // Check shutdown between messages to allow timely exit + if self.shutdown_signal.load(Ordering::Relaxed) { + break; } } @@ -331,7 +335,7 @@ impl ConsensusStateMachine< match self.handle_tick() { Ok(_) => {} Err(e) => { - slog::error!(self.logger, "Error handling tick: {}", e); + slog::warn!(self.logger, "Error handling tick: {}", e); } } last_tick = Instant::now(); @@ -437,7 +441,7 @@ impl ConsensusStateMachine< ViewProgressEvent::ShouldFinalize { view, block_hash } => { self.finalize_view(view, block_hash) } - ViewProgressEvent::ShouldNullify { view } => self.nullify_view(view), + ViewProgressEvent::ShouldNullify { view } => self.nullify_view(view, false), ViewProgressEvent::ShouldBroadcastNullification { view } => { self.broadcast_nullification(view) } @@ -531,7 +535,7 @@ impl ConsensusStateMachine< slog::info!(self.logger, "TODO: Request missing state here"); Ok(()) } - ViewProgressEvent::ShouldNullifyView { view } => self.nullify_view(view), + ViewProgressEvent::ShouldNullifyView { view } => self.nullify_view(view, false), ViewProgressEvent::BroadcastConsensusMessage { message } => { self.broadcast_consensus_message(*message) } @@ -548,8 +552,9 @@ impl ConsensusStateMachine< current_view ); - // 1. Broadcast the nullification for the view that triggered the cascade (if - // needed) + // 1. Broadcast the aggregated nullification for the view that triggered the cascade + // (if needed). This lets other nodes learn about the nullification and + // independently handle their own cascades. if should_broadcast_nullification && let Err(e) = self.broadcast_nullification(start_view) { @@ -561,24 +566,37 @@ impl ConsensusStateMachine< ); } - // 2. Nullify all views from start_view to current_view - for view in (start_view + 1)..=current_view { - if let Err(e) = self.nullify_view(view) { - slog::debug!( - self.logger, - "View {} already nullified or error during cascade: {}", - view, - e - ); - } + // 2. Per paper Algorithm 1 Step 8: do NOT nullify intermediate views. Intermediate + // views between start_view and current_view were already processed through + // normal consensus flow (M-notarized or nullified). Marking them as nullified + // would corrupt their state and cause SelectParent to return inconsistent + // results across nodes, breaking chain integrity. Only the current view should + // be nullified. + + // 3. Send a single Nullify for the current view (per paper Algorithm 1, Step 8). + // This broadcasts our vote AND marks it locally as nullified. + if let Err(e) = self.nullify_view(current_view, true) { + slog::debug!( + self.logger, + "Failed to nullify current view {} during cascade: {}", + current_view, + e + ); } - // 2. Progress to a new fresh view - let new_view = current_view + 1; + // 4. Remove stale pending state diffs from start_view onward. Intermediate + // M-notarized views may have StateDiffs with nonce increments that are no longer + // valid after the cascade. All correct nodes will eventually cascade from the + // same start_view (2f+1 guarantee), so this converges to consistent pending + // state. + self.view_manager + .rollback_pending_diffs_in_range(start_view, current_view); - // 3. Find the most recent valid parent (skips all nullified views) - let parent_hash = self.view_manager.select_parent(new_view); - let leader = self.view_manager.leader_for_view(new_view)?; + // 5. Create a new view context and progress to it. Unlike normal nullification flow + // (which requires aggregated proof), cascade progression only requires local + // nullification of current view. + let new_view = current_view + 1; + let (leader, parent_hash) = self.view_manager.progress_after_cascade(new_view)?; slog::info!( self.logger, @@ -587,7 +605,7 @@ impl ConsensusStateMachine< parent_hash ); - // 4. Progress to the new view + // 6. Replay buffered messages and propose if leader self.progress_to_next_view(new_view, leader, parent_hash)?; Ok(()) @@ -602,21 +620,60 @@ impl ConsensusStateMachine< current_view ); - // Send nullify messages for all views from start_view to current_view (INCLUSIVE) - // This marks them as has_nullified locally and broadcasts nullify votes, - // but does NOT progress to a new view - we must wait for a Nullification - // (aggregated proof with 2F+1 signatures) before we can safely progress. - for view in start_view..=current_view { - if let Err(e) = self.nullify_view(view) { - slog::debug!( + // Per paper: do NOT nullify intermediate views between start_view and + // current_view. They were already processed through normal consensus flow. + // Marking them as nullified would corrupt their M-notarization state and + // cause SelectParent inconsistencies across nodes. + + // Broadcast nullify for current view only + if let Err(e) = self.nullify_view(current_view, true) { + slog::debug!( + self.logger, + "Failed to nullify current view {}: {}", + current_view, + e + ); + } + + // Remove stale pending state diffs from start_view onward + // (same rationale as ShouldCascadeNullification above). + self.view_manager + .rollback_pending_diffs_in_range(start_view, current_view); + + Ok(()) + } + ViewProgressEvent::ShouldRequestBlock { view, block_hash } => { + slog::info!( + self.logger, + "Requesting missing block from peers"; + "view" => view, + ); + self.broadcast_consensus_message(ConsensusMessage::BlockRecoveryRequest { + view, + block_hash, + }) + } + ViewProgressEvent::ShouldRequestBlocks { requests } => { + slog::info!( + self.logger, + "Requesting {} missing blocks from peers", + requests.len(), + ); + for (view, block_hash) in requests { + if let Err(e) = + self.broadcast_consensus_message(ConsensusMessage::BlockRecoveryRequest { + view, + block_hash, + }) + { + slog::warn!( self.logger, - "View {} already nullified or error: {}", + "Failed to request block for view {}: {}", view, e ); } } - Ok(()) } } @@ -806,6 +863,14 @@ impl ConsensusStateMachine< "Finalizing view {view} with block hash {block_hash:?}" ); + // Defer finalization of the current view — removing it from non_finalized_views + // would break invariants (current() would panic on the next tick). + // The deferred finalization loop in progress_to_next_view will pick this up + // once we advance to the next view. + if view == self.view_manager.current_view_number() { + return Ok(()); + } + // Notify mempool to remove those transactions associated with the finalized block let finalized_txs = self.view_manager.get_block_tx_hashes(view)?; let notification = FinalizedNotification { @@ -821,9 +886,15 @@ impl ConsensusStateMachine< Ok(()) } - /// Nullify a view - fn nullify_view(&mut self, view: u64) -> Result<()> { - slog::debug!(self.logger, "Nullifying view {view}"); + /// Nullify a view. + /// + /// # Arguments + /// * `view` - The view number to nullify + /// * `force` - If true, use cascade nullification (bypasses has_voted/evidence checks). Used + /// for ShouldCascadeNullification and ShouldNullifyRange events. If false, use normal + /// nullification logic (timeout or Byzantine based on evidence). + fn nullify_view(&mut self, view: u64, force: bool) -> Result<()> { + slog::debug!(self.logger, "Nullifying view {view} (force: {force})"); // Get view context to create nullify message let view_ctx = self.view_manager.view_context_mut(view)?; @@ -836,13 +907,14 @@ impl ConsensusStateMachine< return Ok(()); } - let nullify = if view_ctx.has_voted { - // NOTE: After voting, the current replica must have conflicting evidence, - // so in this case, the view is under Byzantine behavior. - view_ctx.create_nullify_for_byzantine(&self.secret_key)? - } else { - // NOTE: If the current replica attempts to nullify a message before voting, it could be - // timeout OR Byzantine. We distinguish based on the type of evidence: + let nullify = if force { + // Cascade nullification: bypass has_voted check. + // This is safe because cascade is for pending state consistency — + // Lemma 5.3 guarantees that a nullified parent can never be L-notarized, + // so descendant views' pending state must be cleaned up. + view_ctx.create_nullify_for_cascade(&self.secret_key)? + } else if view_ctx.has_voted { + // After voting, we need conflicting evidence to nullify (Byzantine). let num_conflicting_votes = view_ctx.num_invalid_votes; let num_nullify_messages = view_ctx.nullify_messages.len(); let combined_count = num_conflicting_votes + num_nullify_messages; @@ -852,6 +924,18 @@ impl ConsensusStateMachine< // - Conflicting votes > F means equivocation (can't finalize) // - Combined evidence > 2F indicates Byzantine quorum view_ctx.create_nullify_for_byzantine(&self.secret_key)? + } else { + // Voted but no sufficient evidence — cannot nullify + return Ok(()); + } + } else { + // Before voting: could be timeout OR Byzantine + let num_conflicting_votes = view_ctx.num_invalid_votes; + let num_nullify_messages = view_ctx.nullify_messages.len(); + let combined_count = num_conflicting_votes + num_nullify_messages; + + if num_conflicting_votes > F || combined_count > 2 * F { + view_ctx.create_nullify_for_byzantine(&self.secret_key)? } else { // Timeout (no strong evidence of Byzantine behavior) view_ctx.create_nullify_for_timeout(&self.secret_key)? @@ -890,6 +974,49 @@ impl ConsensusStateMachine< "Progressing to next view {view} with leader {leader} and notarized block hash {notarized_block_hash:?}" ); + // Try to finalize any pending views that now have enough votes. + // This handles the case where votes accumulated during previous views + // but finalization was deferred (e.g., missing block, missing parent M-notarization). + // + // Limit per pass to avoid blocking the main event loop for too long — each + // finalization involves storage writes and can take ~30ms. Processing hundreds + // of views here would starve message processing (block recovery requests/responses), + // causing other nodes to wait. Remaining views are finalized on the next call. + const MAX_FINALIZATIONS_PER_PASS: usize = 5; + let mut finalization_count = 0; + while let Some((finalizable_view, block_hash)) = self.view_manager.oldest_finalizable_view() + { + if finalization_count >= MAX_FINALIZATIONS_PER_PASS { + break; + } + slog::info!( + self.logger, + "Attempting deferred finalization for view {} (block: {:?})", + finalizable_view, + block_hash + ); + if let Err(e) = self.finalize_view(finalizable_view, block_hash) { + slog::warn!( + self.logger, + "Deferred finalization failed for view {}: {}", + finalizable_view, + e + ); + break; + } + // If the view is still present after finalize_view returned Ok(()), + // it means finalization was deferred (e.g., ancestor missing block). + // Break to avoid an infinite loop — we'll retry on the next view progression. + if self + .view_manager + .find_view_context(finalizable_view) + .is_some() + { + break; + } + finalization_count += 1; + } + // Replay any buffered messages for this view (or previous ones) let pending_views: Vec = self .pending_messages @@ -900,7 +1027,7 @@ impl ConsensusStateMachine< for pending_view in pending_views { if let Some(messages) = self.pending_messages.remove(&pending_view) { - slog::debug!( + slog::warn!( self.logger, "Replaying {} buffered messages for view {}", messages.len(), @@ -908,7 +1035,7 @@ impl ConsensusStateMachine< ); for msg in messages { if let Err(e) = self.handle_consensus_message(msg) { - slog::error!(self.logger, "Failed to process buffered message: {}", e); + slog::warn!(self.logger, "Failed to process buffered message: {}", e); } } } diff --git a/consensus/src/consensus_manager/view_chain.rs b/consensus/src/consensus_manager/view_chain.rs index 0ad7ef7..38050b6 100644 --- a/consensus/src/consensus_manager/view_chain.rs +++ b/consensus/src/consensus_manager/view_chain.rs @@ -96,6 +96,11 @@ pub struct ViewChain { /// The most recent finalized block hash in the current replica's state machine // TODO: Move this to [`ViewProgressManager`] pub(crate) previously_committed_block_hash: [u8; blake3::OUT_LEN], + + /// Views in the canonical chain (ancestor of an L-notarized block) that are missing + /// block data. Populated by `finalize_with_l_notarization` when deferring due to + /// missing ancestor blocks. Drained by `tick()` to trigger proactive recovery. + pub(crate) pending_canonical_recovery: Vec<(u64, [u8; blake3::OUT_LEN])>, } impl ViewChain { @@ -121,6 +126,7 @@ impl ViewChain ViewChain std::ops::RangeInclusive { - let current_view = self.current_view; let least_non_finalized_view = self - .current_view - .saturating_sub(self.non_finalized_count() as u64) - + 1; - least_non_finalized_view..=current_view + .non_finalized_views + .keys() + .min() + .copied() + .unwrap_or(self.current_view); + least_non_finalized_view..=self.current_view } /// Returns the range of view numbers for the non-finalized views up to the given view number. @@ -201,16 +209,178 @@ impl ViewChain Option> { - let current_view = self.current_view; - let upper_bound = view_number.min(current_view); - let least_non_finalized_view = - (self.current_view + 1).saturating_sub(self.non_finalized_count() as u64); + let upper_bound = view_number.min(self.current_view); + let least_non_finalized_view = self.non_finalized_views.keys().min().copied()?; if least_non_finalized_view > upper_bound { return None; } Some(least_non_finalized_view..=upper_bound) } + /// Finds the oldest view that has enough votes for L-notarization (n-f) and a valid + /// parent chain. Used by the finalization retry loop in `progress_to_next_view` to + /// finalize views that were deferred earlier (e.g., missing block or parent M-notarization). + /// + /// # Returns + /// * `Some((view_number, block_hash))` - The oldest finalizable view and its block hash + /// * `None` - No views are ready for finalization + pub fn oldest_finalizable_view(&self) -> Option<(u64, [u8; blake3::OUT_LEN])> { + let mut candidates: Vec = self + .non_finalized_views + .iter() + .filter(|(_, ctx)| { + ctx.votes.len() >= N - F + && ctx.block.is_some() + && ctx.nullification.is_none() + && !ctx.has_nullified + }) + .map(|(v, _)| *v) + .collect(); + + candidates.sort(); + + for view_num in candidates { + // Skip the current view — cannot finalize during active processing + if view_num >= self.current_view { + continue; + } + + let ctx = self.non_finalized_views.get(&view_num).unwrap(); + let block = ctx.block.as_ref().unwrap(); + let parent_hash = block.parent_block_hash(); + let parent_view = self.find_parent_view(&parent_hash); + + let parent_valid = match parent_view { + Some(parent_view_number) => { + let parent_ctx = self.non_finalized_views.get(&parent_view_number).unwrap(); + // Parent must have M-notarization and block. Parent CAN be nullified — + // M-notarization + nullification coexist (n >= 5f+1). This matches + // finalize_with_l_notarization which does not check parent nullification. + parent_ctx.m_notarization.is_some() + && parent_ctx.block.is_some() + // All intermediate views must be nullified + && (parent_view_number + 1..view_num).all(|inter| { + self.non_finalized_views + .get(&inter) + .map(|c| c.nullification.is_some() || c.has_nullified) + .unwrap_or(true) // Already garbage collected + }) + } + None => { + // Parent already finalized — check it matches previously committed hash + parent_hash == self.previously_committed_block_hash + } + }; + + if parent_valid { + return Some((view_num, ctx.block_hash.unwrap())); + } + } + + None + } + + /// Adds a recovered block to a view that has M-notarization but is missing the block. + /// + /// This is called when a peer responds to a `BlockRecoveryRequest` with the actual block. + /// The block is validated by hash comparison against the M-notarization's block_hash + /// (full block validation is skipped since 2f+1 honest validators already validated it). + /// + /// # Returns + /// * `true` if the view also has L-notarization (n-f votes), indicating it's ready to finalize + /// * `false` if the block was added but finalization is not yet possible + pub fn add_recovered_block(&mut self, view: u64, block: Block) -> Result { + let ctx = self + .non_finalized_views + .get_mut(&view) + .ok_or_else(|| anyhow::anyhow!("View {} not in non-finalized views", view))?; + + // Already have a block — no-op + if ctx.block.is_some() { + return Ok(false); + } + + // Verify the block hash matches what M-notarization committed to + let expected_hash = ctx.block_hash.ok_or_else(|| { + anyhow::anyhow!("View {} has no block_hash from M-notarization", view) + })?; + + let actual_hash = block.get_hash(); + if actual_hash != expected_hash { + return Err(anyhow::anyhow!( + "Recovered block hash mismatch for view {}: expected {:?}, got {:?}", + view, + expected_hash, + actual_hash + )); + } + + // Store the block — even for nullified views, the block may be needed + // as a canonical ancestor when an L-notarized descendant finalizes. + ctx.block = Some(block); + + // Per Lemma 5.3, nullified views cannot achieve L-notarization. + // Don't trigger direct finalization — the block will be used during + // ancestor GC when an L-notarized descendant finalizes. + let has_l_notarization = + ctx.votes.len() >= N - F && ctx.nullification.is_none() && !ctx.has_nullified; + + Ok(has_l_notarization) + } + + /// Looks up a block by view number, checking both the non-finalized view chain + /// and finalized storage. Returns `None` if the block is not found in either location. + pub fn get_block_for_view(&self, view: u64) -> Option { + // Check non-finalized view chain first + if let Some(ctx) = self.non_finalized_views.get(&view) + && let Some(ref block) = ctx.block + { + return Some(block.clone()); + } + + // Fall back to finalized storage (block may have been GC'd from memory) + self.persistence_writer + .store() + .get_finalized_block_by_height(view) + .ok() + .flatten() + } + + /// Looks up a block for recovery, searching all storage locations. + /// + /// Unlike [`get_block_for_view`](Self::get_block_for_view) which only checks the + /// non-finalized chain and finalized storage, this also searches nullified and + /// non-finalized block tables using the block hash. This is needed because blocks + /// for views that were GC'd as non-canonical are stored in `NULLIFIED_BLOCKS`, + /// not `FINALIZED_BLOCKS`. + pub fn get_block_for_recovery( + &self, + view: u64, + block_hash: &[u8; blake3::OUT_LEN], + ) -> Option { + // Check non-finalized view chain first + if let Some(ctx) = self.non_finalized_views.get(&view) + && let Some(ref block) = ctx.block + { + return Some(block.clone()); + } + + let store = self.persistence_writer.store(); + + // Check finalized storage + if let Some(block) = store.get_finalized_block_by_height(view).ok().flatten() { + return Some(block); + } + + // Check nullified storage (block may have been GC'd as non-canonical) + if let Some(block) = store.get_nullified_block(block_hash).ok().flatten() { + return Some(block); + } + + // Check non-finalized storage + store.get_non_finalized_block(block_hash).ok().flatten() + } + /// Stores a pre-computed [`StateDiff`] instance for a view. /// /// This method is called when block validation completes and produces a [`StateDiff`] @@ -319,10 +489,11 @@ impl ViewChain ViewChain ViewChain ViewChain, + ) -> Result<()> { + // 1. Check that the next view context is the next view. + if next_view_ctx.view_number != self.current_view + 1 { + return Err(anyhow::anyhow!( + "View number {} is not the next view number {}", + next_view_ctx.view_number, + self.current_view + 1 + )); + } + + // 2. Check that the current view has been locally nullified. + // Unlike progress_with_nullification, we accept has_nullified without + // requiring an aggregated nullification proof. + let current = self.current(); + if !current.has_nullified && current.nullification.is_none() { + return Err(anyhow::anyhow!( + "The current view {} has not been nullified (locally or via proof), but progress_after_cascade was called", + self.current_view + )); + } + + // 3. Update the current view to the next view. + self.current_view = next_view_ctx.view_number; + self.non_finalized_views + .insert(next_view_ctx.view_number, next_view_ctx); + + Ok(()) + } + /// Finalizes a view with L-notarization and performs garbage collection. /// /// L-notarization (n-f votes) is the ONLY mechanism that finalizes blocks and commits them @@ -714,7 +946,7 @@ impl ViewChain Result<()> { - // 1. Check that the view number is not the current view. + // 1. Cannot finalize views from the future. if finalized_view > self.current_view { return Err(anyhow::anyhow!( "View number {} is not greater than the current view {}, cannot finalize views in the future", @@ -740,6 +972,15 @@ impl ViewChain ViewChain= finalized_view { return Err(anyhow::anyhow!( "Parent view {} is more recent than the finalized view {}, cannot finalize view {}", @@ -775,10 +1022,11 @@ impl ViewChain ViewChain ViewChain= N - F { - // Has L-notarization too - self.previously_committed_block_hash = ctx.block_hash.unwrap(); - self.persist_l_notarized_view(&ctx, peers)?; - } else { - // Only M-notarized - self.persist_m_notarized_view(&ctx, peers)?; - } - } else if view_number > parent_view_number { - // Intermediate view - must be nullified (already validated above) - self.persist_nullified_view(&ctx, peers)?; + } else if canonical_views.contains(&view_number) { + // Canonical chain view — persist as finalized + if let Some(bh) = ctx.block_hash { + self.previously_committed_block_hash = bh; + } + if ctx.votes.len() >= N - F && ctx.block.is_some() { + self.persist_l_notarized_view(&ctx, peers)?; } else { - // View before parent - could be part of an earlier chain - // These should have been finalized in a previous call or be nullified - if ctx.nullification.is_some() { - self.persist_nullified_view(&ctx, peers)?; - } else if ctx.votes.len() >= N - F { - self.previously_committed_block_hash = ctx.block_hash.unwrap(); - self.persist_l_notarized_view(&ctx, peers)?; - } else { - // M-notarized view from an earlier chain - self.persist_m_notarized_view(&ctx, peers)?; - } + self.persist_m_notarized_view(&ctx, peers)?; } } else { - // Parent not in non-finalized views (already finalized) - // These are intermediate nullified views - if ctx.nullification.is_none() { - return Err(anyhow::anyhow!( - "View {} has no nullification but parent is already finalized", - view_number - )); - } - self.persist_nullified_view(&ctx, peers)?; + // Non-canonical view (dead fork or intermediate nullified view). + // Persist metadata but NOT as a finalized block. + self.persist_nullified_view_or_metadata(&ctx, peers)?; } } @@ -960,9 +1233,9 @@ impl ViewChain ViewChain ViewChain ViewChain ViewChain, + peers: &PeerSet, + ) -> Result<()> { + // If the view has nullification artifacts, use the standard path + if ctx.nullification.is_some() || ctx.has_nullified { + return self.persist_nullified_view(ctx, peers); + } + + // Non-canonical view without nullification — persist metadata only + let view_number = ctx.view_number; + self.persistence_writer.remove_nullified_view(view_number); + + if let Some(ref block) = ctx.block { + self.persistence_writer.put_nullified_block(block)?; + } + if let Some(ref m_notarization) = ctx.m_notarization { + self.persistence_writer.put_m_notarization(m_notarization)?; + } + + let leader = Leader::new(ctx.leader_id, view_number); + self.persistence_writer.put_leader(&leader)?; + + let leader_pk = peers.id_to_public_key.get(&ctx.leader_id).unwrap(); + let view = View::new(view_number, leader_pk.clone(), false, true); + self.persistence_writer.put_view(&view)?; + + for vote in ctx.votes.iter() { + self.persistence_writer.put_vote(vote)?; + } + + Ok(()) + } + /// Persists a nullified view to storage as a failed consensus attempt. /// /// Nullified views represent failed consensus where the replica network collected 2f+1 @@ -1082,9 +1399,9 @@ impl ViewChain Result<()> { let view_number = ctx.view_number; - if ctx.nullification.is_none() { + if ctx.nullification.is_none() && !ctx.has_nullified { return Err(anyhow::anyhow!( - "View number {view_number} has no nullification, but the view has been nullified", + "View number {view_number} has no nullification or local nullify, but persist_nullified_view was called", )); } @@ -1096,9 +1413,12 @@ impl ViewChain ViewChain = None; for (view_num, ctx) in &self.non_finalized_views { - // Skip views that: - // 1. Have a full nullification quorum, OR - // 2. Have been locally marked for nullification (has_nullified = true) + // Per paper Algorithm 1: SelectParent returns the greatest M-notarized + // NON-NULLIFIED view v' < new_view. A nullified view — even if M-notarized — + // must be skipped, because its block will never be finalized (Lemma 5.3). + // The pending state diff for nullified views is also removed, so using them + // as parents would create a mismatch between the parent chain and pending state. if ctx.nullification.is_some() || ctx.has_nullified { continue; } @@ -1272,6 +1594,30 @@ impl ViewChain ViewChain Option { + pub(crate) fn find_parent_view(&self, parent_hash: &[u8; blake3::OUT_LEN]) -> Option { for (view_num, ctx) in &self.non_finalized_views { if let Some(hash) = ctx.block_hash && hash == *parent_hash @@ -2077,7 +2423,7 @@ mod tests { let setup = TestSetup::new(N); let leader_id = setup.leader_id(0); let replica_id = setup.replica_id(1); - let parent_hash = [15u8; blake3::OUT_LEN]; + let parent_hash = Block::genesis_hash(); // Create view 1 with M-notarization let mut ctx_v1 = create_view_context_with_votes( @@ -2464,7 +2810,7 @@ mod tests { let setup = TestSetup::new(N); let leader_id = setup.leader_id(0); let replica_id = setup.replica_id(1); - let parent_hash = [21u8; blake3::OUT_LEN]; + let parent_hash = Block::genesis_hash(); let mut ctx_v1 = create_view_context_with_votes( 1, @@ -4636,11 +4982,9 @@ mod tests { // Now verify ALL data was persisted to the database let storage = &view_chain.persistence_writer; - // Check block was persisted (as M-notarized, not finalized) - let stored_block_v1 = storage - .store() - .get_non_finalized_block(&block_hash_v1) - .unwrap(); + // Check block was persisted as finalized (M-notarized parent committed via child's + // L-notarization) + let stored_block_v1 = storage.store().get_finalized_block(&block_hash_v1).unwrap(); assert!(stored_block_v1.is_some()); let stored_block_v1 = stored_block_v1.unwrap(); assert_eq!(stored_block_v1.get_hash(), block_hash_v1); @@ -4870,17 +5214,19 @@ mod tests { "Should still select view 2's block as parent" ); - // Now nullify view 2 as well + // Now nullify view 2 as well (locally) if let Some(ctx) = view_chain.non_finalized_views.get_mut(&2) { ctx.has_nullified = true; } - // After nullifying both views: select_parent(3) should fall back to - // previously_committed_block_hash + // Per paper: SelectParent returns the greatest M-notarized NON-NULLIFIED view. + // View 2 is now nullified (has_nullified = true), so it should be skipped. + // View 1 is also nullified, so SelectParent should fall back to genesis. let parent_fallback = view_chain.select_parent(3); assert_eq!( - parent_fallback, view_chain.previously_committed_block_hash, - "Should fall back to previously committed block when all views nullified" + parent_fallback, + Block::genesis_hash(), + "Should fall back to genesis hash when all views are nullified" ); std::fs::remove_dir_all(setup.temp_dir.path()).unwrap(); @@ -4979,11 +5325,14 @@ mod tests { ctx.nullification = Some(nullification_v2); } - // After nullification on both views: should fall back to previously committed + // Per paper: SelectParent returns the greatest M-notarized NON-NULLIFIED view. + // View 2 now has aggregated nullification, so it should be skipped. + // View 1 also has nullification, so fall back to genesis hash. let parent_fallback = view_chain.select_parent(3); assert_eq!( - parent_fallback, view_chain.previously_committed_block_hash, - "Should fall back to previously committed block when all views have nullification" + parent_fallback, + Block::genesis_hash(), + "Should fall back to genesis hash when all views are nullified" ); std::fs::remove_dir_all(setup.temp_dir.path()).unwrap(); diff --git a/consensus/src/consensus_manager/view_context.rs b/consensus/src/consensus_manager/view_context.rs index df300b8..f2dc4eb 100644 --- a/consensus/src/consensus_manager/view_context.rs +++ b/consensus/src/consensus_manager/view_context.rs @@ -1072,6 +1072,28 @@ impl ViewContext= 5f+1), but L-notarization (n-f) and nullification (2f+1) cannot. + pub fn create_nullify_for_cascade(&mut self, secret_key: &BlsSecretKey) -> Result { + if self.has_nullified { + return Err(anyhow::anyhow!("Already nullified in this view")); + } + + self.create_nullify_message(secret_key) + } + /// Internal helper to create the actual nullify message fn create_nullify_message(&mut self, secret_key: &BlsSecretKey) -> Result { let message = diff --git a/consensus/src/consensus_manager/view_manager.rs b/consensus/src/consensus_manager/view_manager.rs index bf1c451..26a45e7 100644 --- a/consensus/src/consensus_manager/view_manager.rs +++ b/consensus/src/consensus_manager/view_manager.rs @@ -366,7 +366,7 @@ //! - Validated block processing and StateDiff storage //! - Pending state timing (StateDiff not in pending until M-notarization) -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Instant}; use anyhow::Result; use tracing::instrument; @@ -383,7 +383,7 @@ use crate::{ ShouldMNotarize, ViewContext, }, }, - crypto::aggregated::{BlsPublicKey, BlsSecretKey, BlsSignature, PeerId}, + crypto::aggregated::{BlsPublicKey, BlsSignature, PeerId}, state::{ block::Block, notarizations::{MNotarization, Vote}, @@ -432,6 +432,15 @@ pub struct ViewProgressManager last request time). + /// Prevents flooding the network with repeated requests for the same missing block. + block_recovery_cooldowns: HashMap, + + /// Persistent set of canonical views needing block recovery for deferred finalization. + /// Entries persist across ticks until the block is recovered or the view is GC'd. + /// Populated after `finalize_with_l_notarization` defers; drained when blocks arrive. + canonical_recovery_pending: Vec<(u64, [u8; blake3::OUT_LEN])>, + /// Logger for logging events logger: slog::Logger, } @@ -452,7 +461,7 @@ impl ViewProgressManager ViewProgressManager ViewProgressManager config.genesis_accounts.len(), ); - let mut genesis_context: ViewContext = - ViewContext::new(0, genesis_leader, replica_id, [0; blake3::OUT_LEN]); - genesis_context.block = Some(genesis_block.clone()); - genesis_context.block_hash = Some(genesis_block_hash); - genesis_context.has_voted = true; - genesis_context.m_notarization = Some(genesis_m_notarization); - - // 3. Initialize ViewChain starting at View 1 + // Initialize ViewChain starting at View 1. // The parent of View 1 is the Genesis Hash. let view1_leader = leader_manager.leader_for_view(1)?.peer_id(); @@ -508,6 +501,8 @@ impl ViewProgressManager ViewProgressManager ViewProgressManager Result { - let current_view = self.view_chain.current_view_mut(); - - if current_view.view_number != 0 { - return Err(anyhow::anyhow!("Can only create genesis vote at view 0")); - } - - if current_view.has_voted { - return Err(anyhow::anyhow!("Already voted for genesis")); - } - - let genesis_hash = current_view - .block_hash - .ok_or_else(|| anyhow::anyhow!("Genesis block not set"))?; - - // Create the vote - let signature = secret_key.sign(&genesis_hash); - - let vote = Vote::new( - 0, // view 0 - genesis_hash, - signature, - self.replica_id, - current_view.leader_id, - ); - - // Mark as voted and add our own vote - current_view.has_voted = true; - current_view.votes.insert(vote.clone()); - - Ok(vote) - } - /// Selects the parent block for a new view according to the Minimmit SelectParent function. /// /// This implements the SelectParent(S, v) function from the Minimmit paper (Section 4): @@ -620,6 +583,17 @@ impl ViewProgressManager Option<(u64, [u8; blake3::OUT_LEN])> { + self.view_chain.oldest_finalizable_view() + } + + /// Finds a view context by view number (read-only). + pub fn find_view_context(&self, view_number: u64) -> Option<&ViewContext> { + self.view_chain.find_view_context(view_number) + } + /// Main driver of the state machine replication algorithm. /// /// Processes received `ConsensusMessage` and emits appropriate `ViewProgressEvent`s @@ -638,6 +612,12 @@ impl ViewProgressManager { self.handle_nullification(nullification) } + ConsensusMessage::BlockRecoveryRequest { view, block_hash } => { + self.handle_block_recovery_request(view, block_hash) + } + ConsensusMessage::BlockRecoveryResponse { view, block } => { + self.handle_block_recovery_response(view, block) + } } } @@ -650,12 +630,21 @@ impl ViewProgressManager Result> { + // Clean up canonical recovery entries: remove views that have been recovered or GC'd. + self.canonical_recovery_pending.retain(|(view, _)| { + self.view_chain + .find_view_context(*view) + .map(|ctx| ctx.block.is_none()) // Keep if block still missing + .unwrap_or(false) // Remove if view GC'd (finalized or dropped) + }); let current_view = self.view_chain.current(); let view_range = self.view_chain.non_finalized_view_numbers_range(); for view_number in view_range { // Check if this past view has timed out and should be nullified - let view_ctx = self.view_chain.find_view_context(view_number).unwrap(); + let Some(view_ctx) = self.view_chain.find_view_context(view_number) else { + continue; + }; if current_view.view_number == view_ctx.view_number { // NOTE: In the case of the current view, we should prioritize handling leader block @@ -672,10 +661,35 @@ impl ViewProgressManager ViewProgressManager= MAX_BATCH_RECOVERY { + break; + } + + let Some(view_ctx) = self.view_chain.find_view_context(view_number) else { + continue; + }; + + // Skip the current view — it's still actively processing + if view_number >= current_view.view_number { + continue; + } + + // Candidate: has M-notarization and block_hash but no block data + if view_ctx.m_notarization.is_some() + && view_ctx.block.is_none() + && view_ctx.pending_block.is_none() + && !view_ctx.has_nullified + && view_ctx.nullification.is_none() + && let Some(block_hash) = view_ctx.block_hash + { + // Check cooldown: don't re-request within 500ms + let should_request = self + .block_recovery_cooldowns + .get(&view_number) + .map(|last| last.elapsed() >= std::time::Duration::from_millis(500)) + .unwrap_or(true); + + if should_request { + self.block_recovery_cooldowns + .insert(view_number, Instant::now()); + recovery_requests.push((view_number, block_hash)); + } + } + } + + // Include proactive canonical recovery requests (from deferred finalization). + // These entries persist in canonical_recovery_pending until blocks arrive. + for &(view, block_hash) in &self.canonical_recovery_pending { + if recovery_requests.len() >= MAX_BATCH_RECOVERY { + break; + } + // Avoid duplicates with regular recovery + if recovery_requests.iter().any(|(v, _)| *v == view) { + continue; + } + let should_request = self + .block_recovery_cooldowns + .get(&view) + .map(|last| last.elapsed() >= std::time::Duration::from_millis(500)) + .unwrap_or(true); + if should_request { + self.block_recovery_cooldowns.insert(view, Instant::now()); + recovery_requests.push((view, block_hash)); + } + } + + if recovery_requests.len() == 1 { + let (view, block_hash) = recovery_requests[0]; + return Ok(ViewProgressEvent::ShouldRequestBlock { view, block_hash }); + } else if recovery_requests.len() > 1 { + return Ok(ViewProgressEvent::ShouldRequestBlocks { + requests: recovery_requests, + }); + } + // If no block available yet, await if !current_view.has_voted && !current_view.has_nullified && current_view.block.is_none() { return Ok(ViewProgressEvent::Await); @@ -787,7 +872,23 @@ impl ViewProgressManager Result<()> { self.view_chain - .finalize_with_l_notarization(view, &self.peers) + .finalize_with_l_notarization(view, &self.peers)?; + + // Propagate any canonical recovery requests to the persistent tracking set. + // ViewChain populates pending_canonical_recovery during finalization when + // canonical ancestors are missing blocks. We move them here so they persist + // across ticks until the blocks actually arrive. + for entry in self.view_chain.pending_canonical_recovery.drain(..) { + if !self + .canonical_recovery_pending + .iter() + .any(|(v, _)| *v == entry.0) + { + self.canonical_recovery_pending.push(entry); + } + } + + Ok(()) } /// Marks that the current replica has proposed a block for a view. @@ -817,17 +918,56 @@ impl ViewProgressManager Result<()> { if let Some(ctx) = self.view_chain.find_view_context_mut(view) { ctx.has_nullified = true; - Ok(()) } else { - Err(anyhow::anyhow!( + return Err(anyhow::anyhow!( "Cannot mark nullified for view {} (view not found)", view - )) + )); } + // Remove pending state diff for this view to prevent stale state from + // causing InvalidNonce errors in future transaction validation. + self.view_chain.remove_pending_diff(view); + Ok(()) + } + + /// Removes pending state diffs for views in `[start_view, end_view]` (inclusive). + /// + /// Called during cascade nullification to clean up stale [`StateDiff`] entries + /// from intermediate M-notarized views. See [`ViewChain::rollback_pending_diffs_in_range`]. + pub fn rollback_pending_diffs_in_range(&mut self, start_view: u64, end_view: u64) { + self.view_chain + .rollback_pending_diffs_in_range(start_view, end_view); + } + + /// Computes the leader for a given view using the leader manager directly. + /// + /// Unlike [`leader_for_view`], this does NOT require a view context to exist. + /// Used during cascade nullification when the new view hasn't been created yet. + pub fn compute_leader_for_view(&self, view: u64) -> Result { + Ok(self.leader_manager.leader_for_view(view)?.peer_id()) + } + + /// Progresses to the next view after a cascade nullification. + /// + /// Creates a new view context for `new_view` and advances the view chain. + /// This is used when cascade nullification has locally nullified the current view + /// but no aggregated nullification proof exists yet. + /// + /// # Returns + /// `(leader, parent_hash)` for the new view. + pub fn progress_after_cascade( + &mut self, + new_view: u64, + ) -> Result<(PeerId, [u8; blake3::OUT_LEN])> { + let new_leader = self.leader_manager.leader_for_view(new_view)?.peer_id(); + let parent_hash = self.view_chain.select_parent(new_view); + let new_view_context = ViewContext::new(new_view, new_leader, self.replica_id, parent_hash); + self.view_chain.progress_after_cascade(new_view_context)?; + Ok((new_leader, parent_hash)) } /// Processes a block that has already been validated by the validation service. @@ -1185,12 +1325,23 @@ impl ViewProgressManager ViewProgressManager ViewProgressManager Result> { + // Leader + F backups = F+1 responders. With at most F faulty nodes, + // at least one honest node is guaranteed to respond. + let is_responder = (0..=F).any(|offset| { + self.leader_manager + .leader_for_view(view + offset as u64) + .map(|l| l.peer_id() == self.replica_id) + .unwrap_or(false) + }); + if !is_responder { + return Ok(ViewProgressEvent::NoOp); + } + + // Search all storage locations for the block + if let Some(block) = self.view_chain.get_block_for_recovery(view, &block_hash) { + slog::info!( + self.logger, + "Responding to block recovery request"; + "view" => view, + ); + return Ok(ViewProgressEvent::BroadcastConsensusMessage { + message: Box::new(ConsensusMessage::BlockRecoveryResponse { view, block }), + }); + } + + Ok(ViewProgressEvent::NoOp) + } + + /// Handles a block recovery response containing a recovered block from a peer. + /// + /// Validates the block hash against the M-notarization, adds it to the view chain, + /// and triggers finalization if L-notarization (n-f votes) is available. + fn handle_block_recovery_response( + &mut self, + view: u64, + block: Block, + ) -> Result> { + match self.view_chain.add_recovered_block(view, block.clone()) { + Ok(has_l_notarization) => { + // Clear cooldown tracking for this view + self.block_recovery_cooldowns.remove(&view); + + if has_l_notarization { + let block_hash = self + .view_chain + .find_view_context(view) + .and_then(|ctx| ctx.block_hash) + .ok_or_else(|| anyhow::anyhow!("Block hash missing after recovery"))?; + + slog::info!( + self.logger, + "Recovered block triggers finalization"; + "view" => view, + ); + + return Ok(ViewProgressEvent::ShouldFinalize { view, block_hash }); + } + + slog::info!( + self.logger, + "Block recovered, awaiting L-notarization"; + "view" => view, + ); + + // Check if this block recovery unblocks a deferred finalization + // for an L-notarized descendant. + if let Some((finalizable_view, block_hash)) = + self.view_chain.oldest_finalizable_view() + { + return Ok(ViewProgressEvent::ShouldFinalize { + view: finalizable_view, + block_hash, + }); + } + + Ok(ViewProgressEvent::NoOp) + } + Err(e) => { + // View was GC'd — its chain position is already determined. + // Don't persist the block to finalized store as it bypasses chain validation. + slog::debug!( + self.logger, + "Block recovery: view GC'd, dropping recovered block"; + "view" => view, + "reason" => %e, + ); + self.block_recovery_cooldowns.remove(&view); + Ok(ViewProgressEvent::NoOp) + } + } + } + /// Process pending child blocks recursively until no more can be processed. /// This handles cascading scenarios where processing a pending block causes it to reach /// M-notarization, which then allows its own pending children to be processed. @@ -1786,7 +2052,7 @@ mod tests { ) .unwrap(); - assert_eq!(manager.current_view_number(), 0); + assert_eq!(manager.current_view_number(), 1); assert_eq!(manager.non_finalized_count(), 1); assert_eq!(manager.peers.sorted_peer_ids.len(), 6); @@ -6690,12 +6956,14 @@ mod tests { // Mark view 1 as nullified (simulating cascade) manager.mark_nullified(1).unwrap(); - // select_parent should now skip view 1 and return the genesis/previously committed hash + // Per paper: SelectParent returns the greatest M-notarized NON-NULLIFIED view. + // View 1 is now nullified via mark_nullified, so it should be skipped. + // Should fall back to genesis hash. let parent_after = manager.select_parent(3); - let expected_parent = manager.view_chain.previously_committed_block_hash; assert_eq!( - parent_after, expected_parent, - "After nullifying view 1, select_parent should return previously committed block" + parent_after, + Block::genesis_hash(), + "Nullified view should not be used as parent, should fall back to genesis" ); std::fs::remove_file(path).unwrap(); diff --git a/p2p/src/service.rs b/p2p/src/service.rs index f62ea8f..4295bce 100644 --- a/p2p/src/service.rs +++ b/p2p/src/service.rs @@ -507,10 +507,9 @@ where return false; } - _ = context.sleep(recv_timeout) => { - // Continue loop - } - + // Prioritize message reception over sleep to avoid delaying peer discovery. + // When both a message and the sleep timer are ready simultaneously, + // biased select picks the first matching branch. res = receivers.sync.recv() => { if let Ok((sender, msg)) = res && let Ok(p2p_msg) = deserialize_message::(&msg) { match p2p_msg { @@ -535,6 +534,10 @@ where } } + _ = context.sleep(recv_timeout) => { + // No messages received within timeout, continue loop + } + // NOTE: We intentionally do NOT listen on consensus or tx channels during bootstrap. // Consuming messages from those channels would drop them, as they can't be re-sent. // The sync channel ping/pong is sufficient for readiness detection. diff --git a/tests/src/e2e_consensus/scenarios.rs b/tests/src/e2e_consensus/scenarios.rs index c795fa8..d080669 100644 --- a/tests/src/e2e_consensus/scenarios.rs +++ b/tests/src/e2e_consensus/scenarios.rs @@ -476,8 +476,8 @@ fn test_multi_node_happy_path() { validators, total_number_peers: N, maximum_number_faulty_peers: F, - bootstrap_timeout_ms: 20_000, // 20 seconds for tests - more time for peers to connect - ping_interval_ms: 200, // Faster ping for quicker discovery + bootstrap_timeout_ms: 20_000, + ping_interval_ms: 200, // Faster ping for quicker discovery ..Default::default() };