Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ macro_rules! emit_event {

const WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);

/// Keepalive ping interval (client -> server).
const WS_PING_INTERVAL: Duration = Duration::from_secs(30);

/// If no inbound frame (including the Pong replying to our keepalive ping)
/// arrives within this window, the connection is treated as half-open
/// (a "zombie" socket where the OS still reports ESTABLISHED but the peer
/// is silently gone) and we force a reconnect. 3x the ping interval, so a
/// live connection tolerates losing two consecutive pongs before tripping.
const WS_READ_IDLE_TIMEOUT: Duration = Duration::from_secs(90);

fn ws_config() -> WebSocketConfig {
let mut config = WebSocketConfig::default();
config.max_message_size = Some(10 * 1024 * 1024); // 10 MB
Expand Down Expand Up @@ -873,6 +883,14 @@ enum WsExitReason {

const MAX_BACKOFF_SECS: u64 = 30;

/// Equal Jitter: returns a randomized backoff duration in `[backoff/2, backoff]`.
/// The floor (backoff/2) prevents a fast-failing connect from busy-looping,
/// while the randomization de-synchronizes reconnects across accounts.
fn backoff_with_jitter(backoff_secs: u64) -> Duration {
let half = backoff_secs as f64 / 2.0;
Duration::from_secs_f64(half + half * rand::random::<f64>())
}

/// Top-level task that handles reconnection with exponential backoff.
#[allow(clippy::too_many_arguments)]
async fn connection_task(
Expand Down Expand Up @@ -919,8 +937,11 @@ async fn connection_task(
}
);

// Wait with backoff, but listen for Shutdown during the wait
let sleep = tokio::time::sleep(Duration::from_secs(backoff_secs));
// Wait with backoff, but listen for Shutdown during the wait.
// Equal Jitter (sleep in [backoff/2, backoff]) de-syncs reconnects
// across accounts while keeping a floor, so a fast-failing connect
// (immediate TCP RST) can't spin into a hot retry loop.
let sleep = tokio::time::sleep(backoff_with_jitter(backoff_secs));
tokio::pin!(sleep);

let shutdown_during_wait = loop {
Expand Down Expand Up @@ -1062,12 +1083,21 @@ async fn ws_loop(
cmd_rx: &mut mpsc::UnboundedReceiver<WsCommand>,
subscriptions: &Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
) -> WsExitReason {
let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
let mut ping_interval = tokio::time::interval(WS_PING_INTERVAL);
ping_interval.tick().await; // skip the first immediate tick

// Read-idle watchdog: any inbound frame (data, server Ping, or the Pong
// replying to our keepalive ping) proves the link is alive and resets the
// deadline. If it elapses, the connection is half-open and we reconnect.
let idle = tokio::time::sleep(WS_READ_IDLE_TIMEOUT);
tokio::pin!(idle);

loop {
tokio::select! {
msg = read.next() => {
if matches!(msg, Some(Ok(_))) {
idle.as_mut().reset(tokio::time::Instant::now() + WS_READ_IDLE_TIMEOUT);
}
match msg {
Some(Ok(Message::Text(text))) => {
let emitter_c = emitter.clone();
Expand Down Expand Up @@ -1146,6 +1176,10 @@ async fn ws_loop(
return WsExitReason::Disconnected;
}
}
_ = &mut idle => {
tracing::warn!(account_id, "read idle timeout: half-open connection detected, reconnecting");
return WsExitReason::Disconnected;
}
}
}
}
Expand Down Expand Up @@ -1681,3 +1715,33 @@ async fn polling_loop(
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn backoff_jitter_stays_within_equal_jitter_bounds() {
// Equal Jitter must always land in [backoff/2, backoff] and never panic
// (from_secs_f64 panics on negative/NaN). Sample across the backoff range.
for backoff_secs in [1u64, 2, 4, 8, 16, MAX_BACKOFF_SECS] {
let lo = backoff_secs as f64 / 2.0;
let hi = backoff_secs as f64;
for _ in 0..1000 {
let d = backoff_with_jitter(backoff_secs).as_secs_f64();
assert!(
d >= lo && d <= hi,
"backoff {backoff_secs}s jittered to {d}s, outside [{lo}, {hi}]"
);
}
}
}

#[test]
fn read_idle_timeout_exceeds_ping_interval() {
// The watchdog must outlast the ping cadence, otherwise a live but quiet
// connection (only kept alive by ping/pong) would be falsely torn down.
// 3x gives margin for two consecutive lost pongs.
assert!(WS_READ_IDLE_TIMEOUT >= WS_PING_INTERVAL * 3);
}
}
Loading