diff --git a/src/streaming.rs b/src/streaming.rs index 4e3cb1f..2975ede 100644 --- a/src/streaming.rs +++ b/src/streaming.rs @@ -72,6 +72,16 @@ macro_rules! emit_event { const WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +/// Keepalive ping interval (client -> server). +const WS_PING_INTERVAL: Duration = Duration::from_secs(30); + +/// If no inbound frame (including the Pong replying to our keepalive ping) +/// arrives within this window, the connection is treated as half-open +/// (a "zombie" socket where the OS still reports ESTABLISHED but the peer +/// is silently gone) and we force a reconnect. 3x the ping interval, so a +/// live connection tolerates losing two consecutive pongs before tripping. +const WS_READ_IDLE_TIMEOUT: Duration = Duration::from_secs(90); + fn ws_config() -> WebSocketConfig { let mut config = WebSocketConfig::default(); config.max_message_size = Some(10 * 1024 * 1024); // 10 MB @@ -873,6 +883,14 @@ enum WsExitReason { const MAX_BACKOFF_SECS: u64 = 30; +/// Equal Jitter: returns a randomized backoff duration in `[backoff/2, backoff]`. +/// The floor (backoff/2) prevents a fast-failing connect from busy-looping, +/// while the randomization de-synchronizes reconnects across accounts. +fn backoff_with_jitter(backoff_secs: u64) -> Duration { + let half = backoff_secs as f64 / 2.0; + Duration::from_secs_f64(half + half * rand::random::()) +} + /// Top-level task that handles reconnection with exponential backoff. #[allow(clippy::too_many_arguments)] async fn connection_task( @@ -919,8 +937,11 @@ async fn connection_task( } ); - // Wait with backoff, but listen for Shutdown during the wait - let sleep = tokio::time::sleep(Duration::from_secs(backoff_secs)); + // Wait with backoff, but listen for Shutdown during the wait. + // Equal Jitter (sleep in [backoff/2, backoff]) de-syncs reconnects + // across accounts while keeping a floor, so a fast-failing connect + // (immediate TCP RST) can't spin into a hot retry loop. + let sleep = tokio::time::sleep(backoff_with_jitter(backoff_secs)); tokio::pin!(sleep); let shutdown_during_wait = loop { @@ -1062,12 +1083,21 @@ async fn ws_loop( cmd_rx: &mut mpsc::UnboundedReceiver, subscriptions: &Arc>>, ) -> WsExitReason { - let mut ping_interval = tokio::time::interval(Duration::from_secs(30)); + let mut ping_interval = tokio::time::interval(WS_PING_INTERVAL); ping_interval.tick().await; // skip the first immediate tick + // Read-idle watchdog: any inbound frame (data, server Ping, or the Pong + // replying to our keepalive ping) proves the link is alive and resets the + // deadline. If it elapses, the connection is half-open and we reconnect. + let idle = tokio::time::sleep(WS_READ_IDLE_TIMEOUT); + tokio::pin!(idle); + loop { tokio::select! { msg = read.next() => { + if matches!(msg, Some(Ok(_))) { + idle.as_mut().reset(tokio::time::Instant::now() + WS_READ_IDLE_TIMEOUT); + } match msg { Some(Ok(Message::Text(text))) => { let emitter_c = emitter.clone(); @@ -1146,6 +1176,10 @@ async fn ws_loop( return WsExitReason::Disconnected; } } + _ = &mut idle => { + tracing::warn!(account_id, "read idle timeout: half-open connection detected, reconnecting"); + return WsExitReason::Disconnected; + } } } } @@ -1681,3 +1715,33 @@ async fn polling_loop( } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_jitter_stays_within_equal_jitter_bounds() { + // Equal Jitter must always land in [backoff/2, backoff] and never panic + // (from_secs_f64 panics on negative/NaN). Sample across the backoff range. + for backoff_secs in [1u64, 2, 4, 8, 16, MAX_BACKOFF_SECS] { + let lo = backoff_secs as f64 / 2.0; + let hi = backoff_secs as f64; + for _ in 0..1000 { + let d = backoff_with_jitter(backoff_secs).as_secs_f64(); + assert!( + d >= lo && d <= hi, + "backoff {backoff_secs}s jittered to {d}s, outside [{lo}, {hi}]" + ); + } + } + } + + #[test] + fn read_idle_timeout_exceeds_ping_interval() { + // The watchdog must outlast the ping cadence, otherwise a live but quiet + // connection (only kept alive by ping/pong) would be falsely torn down. + // 3x gives margin for two consecutive lost pongs. + assert!(WS_READ_IDLE_TIMEOUT >= WS_PING_INTERVAL * 3); + } +}