Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 34 additions & 42 deletions daemon/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3874,14 +3874,6 @@ async fn gr_drop_stale_families(tables: &TableHandle, addr: IpAddr, families: &[
}
}

async fn gr_restale_families(tables: &TableHandle, addr: IpAddr, families: &[Family]) {
for i in 0..tables.shards.len() {
for &family in families {
tables.event(i, TableEvent::MarkStale(addr, family)).await;
}
}
}

fn collect_delete_families(outputs: &[crate::gr::GrOutput]) -> Vec<Family> {
outputs
.iter()
Expand Down Expand Up @@ -5037,6 +5029,11 @@ impl PeerSession {
if !self.source.is_empty() {
let drop_families =
families_to_drop_on_disconnect(self.source.keys(), self.negotiated_gr.as_ref());
let stale_families: Vec<Family> = self
.negotiated_gr
.as_ref()
.map(|g| g.families.clone())
.unwrap_or_default();
let import_policy = self.tables.import_policy.load_full();
let export_policy = self.tables.export_policy.load_full();
let kernel_tx = self.tables.kernel_tx.load_full();
Expand All @@ -5054,6 +5051,14 @@ impl PeerSession {
export_policy.as_deref(),
);
}
for &family in &stale_families {
t.event(
TableEvent::MarkStale(self.remote_addr, family),
kernel_tx.as_deref(),
import_policy.as_deref(),
export_policy.as_deref(),
);
}
let reason = self
.shutdown
.take()
Expand Down Expand Up @@ -5086,7 +5091,7 @@ impl PeerSession {

// Operate on PeerContext directly via self.context — no global lock needed
// for the ctx-only operations.
let (no_sessions, stale_families) = {
let no_sessions = {
let mut ctx = self.context.lock().unwrap();

{
Expand All @@ -5105,8 +5110,12 @@ impl PeerSession {
let _ = arb.process(info.role, crate::fsm::Input::Disconnected);
}

let stale_families = if let Some(gr) = &info.negotiated_gr {
// GR active: start restart timer, preserve export_map.
if let Some(gr) = &info.negotiated_gr {
// GR active: advance the state machine to start the restart timer
// and preserve export_map for the reconnecting session.
// MarkStale table events were already sent in session_loop() under
// the same shard locks as peer_event_tx.remove(), so no second pass
// over the table is needed here.
if let Some(h) = ctx.gr_deferral_timer.take() {
h.abort();
}
Expand All @@ -5118,29 +5127,21 @@ impl PeerSession {
families: gr.families.clone(),
restart_time: gr.restart_time,
});
let mut stale_families = Vec::new();
for output in &outputs {
match output {
crate::gr::GrOutput::StartTimer(duration) => {
let dur = *duration;
let addr = self.remote_addr;
let context_c = Arc::clone(&self.context);
let tables_c = tables.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(dur).await;
gr_restart_timer_expired(context_c, tables_c, addr).await;
})
.abort_handle();
ctx.gr_restart_timer = Some(handle);
}
crate::gr::GrOutput::MarkStale(families) => {
stale_families.extend_from_slice(families);
}
_ => {}
if let crate::gr::GrOutput::StartTimer(duration) = output {
let dur = *duration;
let addr = self.remote_addr;
let context_c = Arc::clone(&self.context);
let tables_c = tables.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(dur).await;
gr_restart_timer_expired(context_c, tables_c, addr).await;
})
.abort_handle();
ctx.gr_restart_timer = Some(handle);
}
}
ctx.export_map = info.export_map;
stale_families
} else {
// Normal disconnect: routes were already dropped in session_loop().
// Clean up any leftover GR state from a previous cycle that never recovered.
Expand All @@ -5151,22 +5152,13 @@ impl PeerSession {
h.abort();
}
drop(info.export_map);
Vec::new()
};
}

// Only reset and reconnect when no PeerSession remains for this peer.
let no_sessions = {
let arb = ctx.conn_arbiter.lock().unwrap();
arb.active_close_tx.is_none() && arb.passive_close_tx.is_none()
};
(no_sessions, stale_families)
let arb = ctx.conn_arbiter.lock().unwrap();
arb.active_close_tx.is_none() && arb.passive_close_tx.is_none()
};

// Trigger table re-selection for stale routes outside the context lock.
if !stale_families.is_empty() {
gr_restale_families(&tables, info.remote_addr, &stale_families).await;
}

// Peer-level operations still require the global write lock.
let mut server = global.write().await;
if let Some(peer) = server.peers.get_mut(&info.remote_addr)
Expand Down
22 changes: 8 additions & 14 deletions daemon/src/gr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ pub(crate) enum GrInput {

/// Actions the driver should perform in response to a GR input.
pub(crate) enum GrOutput {
/// Mark routes from this peer stale for the given families.
MarkStale(Vec<Family>),
/// Start (or restart) the restart timer with the given duration.
StartTimer(Duration),
/// Cancel the restart timer.
Expand Down Expand Up @@ -154,7 +152,6 @@ impl GrState {
if matches!(state, Inner::WaitingEor { .. }) {
outputs.push(GrOutput::StopDeferralTimer);
}
outputs.push(GrOutput::MarkStale(families.clone()));
outputs.push(GrOutput::StartTimer(restart_time));
(
Inner::Restarting {
Expand Down Expand Up @@ -264,13 +261,12 @@ mod tests {
}

#[test]
fn session_dropped_marks_stale_and_starts_timer() {
fn session_dropped_starts_timer() {
let mut gr = GrState::new();
let outputs = drop_ipv4(&mut gr);

assert_eq!(outputs.len(), 2);
assert!(matches!(&outputs[0], GrOutput::MarkStale(f) if f == &[ipv4()]));
assert!(matches!(&outputs[1], GrOutput::StartTimer(d) if *d == restart_time()));
assert_eq!(outputs.len(), 1);
assert!(matches!(&outputs[0], GrOutput::StartTimer(d) if *d == restart_time()));
assert!(matches!(gr.state, Inner::Restarting { .. }));
}

Expand Down Expand Up @@ -378,9 +374,8 @@ mod tests {
restart_time: Duration::from_secs(60),
});

assert_eq!(outputs.len(), 2);
assert!(matches!(&outputs[0], GrOutput::MarkStale(f) if f.len() == 2));
assert!(matches!(&outputs[1], GrOutput::StartTimer(d) if *d == Duration::from_secs(60)));
assert_eq!(outputs.len(), 1);
assert!(matches!(&outputs[0], GrOutput::StartTimer(d) if *d == Duration::from_secs(60)));
assert!(matches!(gr.state, Inner::Restarting { .. }));
}

Expand Down Expand Up @@ -437,11 +432,10 @@ mod tests {

let outputs = drop_ipv4(&mut gr);

// StopDeferralTimer + MarkStale + StartTimer
assert_eq!(outputs.len(), 3);
// StopDeferralTimer + StartTimer
assert_eq!(outputs.len(), 2);
assert!(matches!(outputs[0], GrOutput::StopDeferralTimer));
assert!(matches!(&outputs[1], GrOutput::MarkStale(f) if f == &[ipv4()]));
assert!(matches!(outputs[2], GrOutput::StartTimer(d) if d == restart_time()));
assert!(matches!(outputs[1], GrOutput::StartTimer(d) if d == restart_time()));
assert!(matches!(gr.state, Inner::Restarting { .. }));
}

Expand Down
Loading