Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions src/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ const PARTICIPATION_CACHE_MAX: usize = 1000;
/// aborted turns that begin a stream but never reach stream_finish).
const STREAM_CACHE_MAX: usize = 1024;

/// Reconnect the Slack Socket Mode socket if no frame (ping, ack, or event)
/// arrives within this window. A healthy Socket Mode connection is never silent
/// this long, so 60s of silence means the connection is half-open (dead) —
/// detects silent NAT/firewall drops.
const SLACK_READ_IDLE_TIMEOUT_SECS: u64 = 60;

#[derive(Default)]
struct StreamEntry {
active: bool,
Expand Down Expand Up @@ -697,8 +703,25 @@ pub async fn run_slack_adapter(

loop {
tokio::select! {
msg_result = read.next() => {
let Some(msg_result) = msg_result else { break };
timeout_result = tokio::time::timeout(
std::time::Duration::from_secs(SLACK_READ_IDLE_TIMEOUT_SECS),
read.next(),
) => {
// Half-open detection: any inbound frame (ping, ack, event, close)
// resets this timer, so a healthy connection is never frame-silent
// this long. 60s of silence is treated as a dead/half-open socket
// (e.g. a NAT/firewall dropped the idle TCP connection without a Close
// frame or RST), which can otherwise block read.next() indefinitely and
// silently stop the bot from receiving events. Break so the outer
// reconnect loop runs.
let msg_result = match timeout_result {
Ok(Some(msg)) => msg,
Ok(None) => break,
Err(_) => {
warn!("no frame from Slack within idle timeout; assuming dead connection, reconnecting");
break;
}
};
match msg_result {
Ok(tungstenite::Message::Text(text)) => {
let envelope: serde_json::Value =
Expand All @@ -710,9 +733,13 @@ pub async fn run_slack_adapter(
// Acknowledge the envelope immediately
if let Some(envelope_id) = envelope["envelope_id"].as_str() {
let ack = serde_json::json!({"envelope_id": envelope_id});
let _ = write
if let Err(e) = write
.send(tungstenite::Message::Text(ack.to_string()))
.await;
.await
{
warn!(error = %e, "failed to ack Slack envelope; connection likely dead, reconnecting");
break;
}
}

// Slash commands and interactive block_actions aren't
Expand Down Expand Up @@ -1023,7 +1050,10 @@ pub async fn run_slack_adapter(
}
}
Ok(tungstenite::Message::Ping(data)) => {
let _ = write.send(tungstenite::Message::Pong(data)).await;
if let Err(e) = write.send(tungstenite::Message::Pong(data)).await {
warn!(error = %e, "failed to send pong; connection likely dead, reconnecting");
break;
}
}
Ok(tungstenite::Message::Close(_)) => {
warn!("Slack Socket Mode connection closed by server");
Expand Down
Loading