From e5c250689738cc54ddc00d07bb4f0bae4105ce13 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Tue, 19 May 2026 13:27:36 +0900 Subject: [PATCH] daemon,gr: mark stale routes in session_loop under shard lock Previously, MarkStale table events were emitted in run() after session_loop() returned, creating an async gap where the peer's advertise channel was already closed but GR families were not yet marked stale. Move MarkStale dispatch into the existing shard loop in session_loop(), where it executes under the same lock that removes peer_event_tx. This eliminates the window between "stop delivering advertise events" and "mark routes stale," and avoids a second full pass over the shards. Because the stale families are known from negotiated_gr.families, GrOutput::MarkStale is no longer needed; remove it from GrState and update tests accordingly. run() now handles only the timer side-effect (GrOutput::StartTimer) from GrState::process. Assisted-by: Claude Sonnet 4.6 Signed-off-by: FUJITA Tomonori --- daemon/src/event.rs | 76 ++++++++++++++++++++------------------------- daemon/src/gr.rs | 22 +++++-------- 2 files changed, 42 insertions(+), 56 deletions(-) diff --git a/daemon/src/event.rs b/daemon/src/event.rs index e8c2290..87707cb 100644 --- a/daemon/src/event.rs +++ b/daemon/src/event.rs @@ -3874,14 +3874,6 @@ async fn gr_drop_stale_families(tables: &TableHandle, addr: IpAddr, families: &[ } } -async fn gr_restale_families(tables: &TableHandle, addr: IpAddr, families: &[Family]) { - for i in 0..tables.shards.len() { - for &family in families { - tables.event(i, TableEvent::MarkStale(addr, family)).await; - } - } -} - fn collect_delete_families(outputs: &[crate::gr::GrOutput]) -> Vec { outputs .iter() @@ -5037,6 +5029,11 @@ impl PeerSession { if !self.source.is_empty() { let drop_families = families_to_drop_on_disconnect(self.source.keys(), self.negotiated_gr.as_ref()); + let stale_families: Vec = self + .negotiated_gr + .as_ref() + .map(|g| g.families.clone()) + .unwrap_or_default(); let import_policy = self.tables.import_policy.load_full(); let export_policy = self.tables.export_policy.load_full(); let kernel_tx = self.tables.kernel_tx.load_full(); @@ -5054,6 +5051,14 @@ impl PeerSession { export_policy.as_deref(), ); } + for &family in &stale_families { + t.event( + TableEvent::MarkStale(self.remote_addr, family), + kernel_tx.as_deref(), + import_policy.as_deref(), + export_policy.as_deref(), + ); + } let reason = self .shutdown .take() @@ -5086,7 +5091,7 @@ impl PeerSession { // Operate on PeerContext directly via self.context — no global lock needed // for the ctx-only operations. - let (no_sessions, stale_families) = { + let no_sessions = { let mut ctx = self.context.lock().unwrap(); { @@ -5105,8 +5110,12 @@ impl PeerSession { let _ = arb.process(info.role, crate::fsm::Input::Disconnected); } - let stale_families = if let Some(gr) = &info.negotiated_gr { - // GR active: start restart timer, preserve export_map. + if let Some(gr) = &info.negotiated_gr { + // GR active: advance the state machine to start the restart timer + // and preserve export_map for the reconnecting session. + // MarkStale table events were already sent in session_loop() under + // the same shard locks as peer_event_tx.remove(), so no second pass + // over the table is needed here. if let Some(h) = ctx.gr_deferral_timer.take() { h.abort(); } @@ -5118,29 +5127,21 @@ impl PeerSession { families: gr.families.clone(), restart_time: gr.restart_time, }); - let mut stale_families = Vec::new(); for output in &outputs { - match output { - crate::gr::GrOutput::StartTimer(duration) => { - let dur = *duration; - let addr = self.remote_addr; - let context_c = Arc::clone(&self.context); - let tables_c = tables.clone(); - let handle = tokio::spawn(async move { - tokio::time::sleep(dur).await; - gr_restart_timer_expired(context_c, tables_c, addr).await; - }) - .abort_handle(); - ctx.gr_restart_timer = Some(handle); - } - crate::gr::GrOutput::MarkStale(families) => { - stale_families.extend_from_slice(families); - } - _ => {} + if let crate::gr::GrOutput::StartTimer(duration) = output { + let dur = *duration; + let addr = self.remote_addr; + let context_c = Arc::clone(&self.context); + let tables_c = tables.clone(); + let handle = tokio::spawn(async move { + tokio::time::sleep(dur).await; + gr_restart_timer_expired(context_c, tables_c, addr).await; + }) + .abort_handle(); + ctx.gr_restart_timer = Some(handle); } } ctx.export_map = info.export_map; - stale_families } else { // Normal disconnect: routes were already dropped in session_loop(). // Clean up any leftover GR state from a previous cycle that never recovered. @@ -5151,22 +5152,13 @@ impl PeerSession { h.abort(); } drop(info.export_map); - Vec::new() - }; + } // Only reset and reconnect when no PeerSession remains for this peer. - let no_sessions = { - let arb = ctx.conn_arbiter.lock().unwrap(); - arb.active_close_tx.is_none() && arb.passive_close_tx.is_none() - }; - (no_sessions, stale_families) + let arb = ctx.conn_arbiter.lock().unwrap(); + arb.active_close_tx.is_none() && arb.passive_close_tx.is_none() }; - // Trigger table re-selection for stale routes outside the context lock. - if !stale_families.is_empty() { - gr_restale_families(&tables, info.remote_addr, &stale_families).await; - } - // Peer-level operations still require the global write lock. let mut server = global.write().await; if let Some(peer) = server.peers.get_mut(&info.remote_addr) diff --git a/daemon/src/gr.rs b/daemon/src/gr.rs index 3359b25..e1b66f4 100644 --- a/daemon/src/gr.rs +++ b/daemon/src/gr.rs @@ -58,8 +58,6 @@ pub(crate) enum GrInput { /// Actions the driver should perform in response to a GR input. pub(crate) enum GrOutput { - /// Mark routes from this peer stale for the given families. - MarkStale(Vec), /// Start (or restart) the restart timer with the given duration. StartTimer(Duration), /// Cancel the restart timer. @@ -154,7 +152,6 @@ impl GrState { if matches!(state, Inner::WaitingEor { .. }) { outputs.push(GrOutput::StopDeferralTimer); } - outputs.push(GrOutput::MarkStale(families.clone())); outputs.push(GrOutput::StartTimer(restart_time)); ( Inner::Restarting { @@ -264,13 +261,12 @@ mod tests { } #[test] - fn session_dropped_marks_stale_and_starts_timer() { + fn session_dropped_starts_timer() { let mut gr = GrState::new(); let outputs = drop_ipv4(&mut gr); - assert_eq!(outputs.len(), 2); - assert!(matches!(&outputs[0], GrOutput::MarkStale(f) if f == &[ipv4()])); - assert!(matches!(&outputs[1], GrOutput::StartTimer(d) if *d == restart_time())); + assert_eq!(outputs.len(), 1); + assert!(matches!(&outputs[0], GrOutput::StartTimer(d) if *d == restart_time())); assert!(matches!(gr.state, Inner::Restarting { .. })); } @@ -378,9 +374,8 @@ mod tests { restart_time: Duration::from_secs(60), }); - assert_eq!(outputs.len(), 2); - assert!(matches!(&outputs[0], GrOutput::MarkStale(f) if f.len() == 2)); - assert!(matches!(&outputs[1], GrOutput::StartTimer(d) if *d == Duration::from_secs(60))); + assert_eq!(outputs.len(), 1); + assert!(matches!(&outputs[0], GrOutput::StartTimer(d) if *d == Duration::from_secs(60))); assert!(matches!(gr.state, Inner::Restarting { .. })); } @@ -437,11 +432,10 @@ mod tests { let outputs = drop_ipv4(&mut gr); - // StopDeferralTimer + MarkStale + StartTimer - assert_eq!(outputs.len(), 3); + // StopDeferralTimer + StartTimer + assert_eq!(outputs.len(), 2); assert!(matches!(outputs[0], GrOutput::StopDeferralTimer)); - assert!(matches!(&outputs[1], GrOutput::MarkStale(f) if f == &[ipv4()])); - assert!(matches!(outputs[2], GrOutput::StartTimer(d) if d == restart_time())); + assert!(matches!(outputs[1], GrOutput::StartTimer(d) if d == restart_time())); assert!(matches!(gr.state, Inner::Restarting { .. })); }