Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions crates/buzz-acp/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ pub struct CliArgs {
#[arg(long, env = "BUZZ_ACP_HEARTBEAT_INTERVAL", default_value_t = 0)]
pub heartbeat_interval: u64,

/// Seconds between per-turn liveness pings (the crash backstop signal —
/// distinct from heartbeat self-prompting). 0 = disabled.
#[arg(long, env = "BUZZ_ACP_TURN_LIVENESS_SECS", default_value_t = 10)]
pub turn_liveness_secs: u64,

/// Heartbeat prompt text. Conflicts with --heartbeat-prompt-file.
#[arg(
long,
Expand Down Expand Up @@ -435,6 +440,10 @@ pub struct Config {
pub max_turn_duration_secs: u64,
pub agents: u32,
pub heartbeat_interval_secs: u64,
/// Seconds between per-turn liveness pings. 0 = disabled. Distinct from
/// `heartbeat_interval_secs` (agent self-prompting) — this is the desktop
/// crash-backstop signal.
pub turn_liveness_secs: u64,
pub heartbeat_prompt: Option<String>,
pub system_prompt: Option<String>,
pub initial_message: Option<String>,
Expand Down Expand Up @@ -599,6 +608,12 @@ impl Config {
));
}

if args.turn_liveness_secs > 0 && args.turn_liveness_secs < 5 {
return Err(ConfigError::ConfigFile(
"turn liveness interval must be 0 (disabled) or ≥5 seconds".into(),
));
}

let heartbeat_prompt = if let Some(text) = args.heartbeat_prompt {
Some(text)
} else if let Some(ref path) = args.heartbeat_prompt_file {
Expand Down Expand Up @@ -669,6 +684,17 @@ impl Config {
args.heartbeat_interval
};

// Cap turn-liveness interval at 86400s (24h) — same bound as heartbeat.
let turn_liveness_secs = if args.turn_liveness_secs > 86400 {
tracing::warn!(
interval = args.turn_liveness_secs,
"turn liveness interval exceeds 24h — capping at 86400s"
);
86400u64
} else {
args.turn_liveness_secs
};

// Resolve idle_timeout_secs with deprecation handling.
// Precedence: explicit --idle-timeout > --turn-timeout (deprecated) > `DEFAULT_IDLE_TIMEOUT_SECS`.
let idle_timeout_secs = {
Expand Down Expand Up @@ -808,6 +834,7 @@ impl Config {
max_turn_duration_secs,
agents: args.agents,
heartbeat_interval_secs: heartbeat_interval,
turn_liveness_secs,
heartbeat_prompt,
system_prompt,
initial_message: args.initial_message,
Expand Down Expand Up @@ -1173,6 +1200,7 @@ mod tests {
max_turn_duration_secs: 3600,
agents: 1,
heartbeat_interval_secs: 0,
turn_liveness_secs: 10,
heartbeat_prompt: None,
system_prompt: None,
initial_message: None,
Expand Down Expand Up @@ -1742,6 +1770,44 @@ channels = "ALL"
assert!(err.to_string().contains("heartbeat interval must be 0"));
}

// ── turn-liveness validation ─────────────────────────────────────────────

fn validate_turn_liveness(secs: u64) -> Result<(), ConfigError> {
if secs > 0 && secs < 5 {
return Err(ConfigError::ConfigFile(
"turn liveness interval must be 0 (disabled) or ≥5 seconds".into(),
));
}
Ok(())
}

#[test]
fn test_turn_liveness_zero_ok() {
assert!(validate_turn_liveness(0).is_ok());
}

#[test]
fn test_turn_liveness_five_ok() {
assert!(validate_turn_liveness(5).is_ok());
}

#[test]
fn test_turn_liveness_ten_ok() {
assert!(validate_turn_liveness(10).is_ok());
}

#[test]
fn test_turn_liveness_four_rejected() {
let err = validate_turn_liveness(4).unwrap_err();
assert!(err.to_string().contains("turn liveness interval must be 0"));
}

#[test]
fn test_turn_liveness_one_rejected() {
let err = validate_turn_liveness(1).unwrap_err();
assert!(err.to_string().contains("turn liveness interval must be 0"));
}

// ── summary includes agents and heartbeat ────────────────────────────────

#[test]
Expand Down
2 changes: 2 additions & 0 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ async fn tokio_main() -> Result<()> {
initial_message: config.initial_message.clone(),
idle_timeout: Duration::from_secs(config.idle_timeout_secs),
max_turn_duration: Duration::from_secs(config.max_turn_duration_secs),
turn_liveness_interval: Duration::from_secs(config.turn_liveness_secs),
dedup_mode: config.dedup_mode,
system_prompt: config.system_prompt.clone(),
base_prompt: if config.no_base_prompt {
Expand Down Expand Up @@ -3025,6 +3026,7 @@ mod build_mcp_servers_tests {
max_turn_duration_secs: 3600,
agents: 1,
heartbeat_interval_secs: 0,
turn_liveness_secs: 10,
heartbeat_prompt: None,
system_prompt: None,
initial_message: None,
Expand Down
157 changes: 152 additions & 5 deletions crates/buzz-acp/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ pub struct PromptContext {
pub initial_message: Option<String>,
pub idle_timeout: Duration,
pub max_turn_duration: Duration,
/// Interval between per-turn `turn_liveness` observer pings. `Duration::ZERO`
/// disables emission. This is the desktop crash-backstop signal — distinct
/// from `heartbeat_prompt` (agent self-prompting).
pub turn_liveness_interval: Duration,
pub dedup_mode: DedupMode,
pub system_prompt: Option<String>,
pub heartbeat_prompt: Option<String>,
Expand Down Expand Up @@ -1088,18 +1092,36 @@ pub async fn run_prompt_task(
// When control_rx is Some (channel tasks), wrap the prompt in select! so
// the main loop can cancel, interrupt, or rotate it. Heartbeats
// (control_rx=None) take the simple await path — they are not controllable.
//
// The liveness future emits `turn_liveness` pings on an interval and never
// resolves; it rides every prompt-await path as a non-winning select arm so
// a turn stays alive on the desktop while it runs. Built from a captured
// observer handle (not `&agent.acp`) because the prompt holds `&mut agent.acp`.
let liveness = run_turn_liveness(
agent.acp.observer_handle(),
agent.acp.observer_agent_index(),
observer::context_for(
observer_channel_id,
Some(session_id.clone()),
Some(turn_id.clone()),
),
ctx.turn_liveness_interval,
);
tokio::pin!(liveness);

let prompt_result = match control_rx {
None => {
// Heartbeat / non-cancellable path.
agent
.acp
.session_prompt_blocks_with_idle_timeout(
tokio::select! {
biased;
result = agent.acp.session_prompt_blocks_with_idle_timeout(
&session_id,
&prompt_blocks,
ctx.idle_timeout,
ctx.max_turn_duration,
)
.await
) => result,
_ = &mut liveness => unreachable!("liveness future never resolves"),
}
}
Some(rx) => {
tokio::select! {
Expand All @@ -1110,6 +1132,7 @@ pub async fn run_prompt_task(
ctx.idle_timeout,
ctx.max_turn_duration,
) => result,
_ = &mut liveness => unreachable!("liveness future never resolves"),
mode = rx => {
let control_signal = mode.unwrap_or(ControlSignal::Cancel);
// Control signal received. Guard against Race 1: the turn may
Expand Down Expand Up @@ -2017,6 +2040,49 @@ impl Drop for ReactionGuard {
}
}

// ── Turn liveness emission ───────────────────────────────────────────────────
// Periodically emits a `turn_liveness` observer event while a turn is in-flight,
// so the desktop can prune turns whose host died without unwinding (kill -9 /
// crash) far sooner than the no-activity backstop. Runs as a non-resolving
// `select!` arm in `run_prompt_task`: it lives and dies with the prompt future,
// so emission stops on every exit path (complete / cancel / error / panic) with
// no separate teardown to forget.
//
// Takes a captured `ObserverHandle` rather than `&agent.acp` because the prompt
// future holds `&mut agent.acp` for its whole duration — a second borrow would
// not compile.
//
// This future never resolves; callers must race it against the prompt and rely
// on drop for teardown. When `interval` is zero, liveness is disabled and the
// future parks forever without emitting.
async fn run_turn_liveness(
observer: Option<observer::ObserverHandle>,
agent_index: Option<usize>,
context: observer::ObserverContext,
interval: Duration,
) {
let Some(observer) = observer else {
return std::future::pending::<()>().await;
};
if interval.is_zero() {
return std::future::pending::<()>().await;
}
let mut ticker = tokio::time::interval(interval);
// The first tick completes immediately; skip it so the first liveness ping
// fires one interval after the turn starts, not at t=0 (turn_started already
// marks t=0).
ticker.tick().await;
loop {
ticker.tick().await;
observer.emit(
"turn_liveness",
agent_index,
&context,
serde_json::json!({}),
);
}
}

// ── Turn completion scope guard ──────────────────────────────────────────────
// Emits a `turn_completed` observer event on drop, covering ALL exit paths
// (success, error, timeout, cancel, panic) from `run_prompt_task`. Captures
Expand Down Expand Up @@ -2790,4 +2856,85 @@ mod tests {
assert_eq!(*s.turn_counts.get(&ch_b).unwrap(), 3);
assert_eq!(s.core_sections.get(&ch_b).unwrap(), "core-b");
}

// ── turn liveness emission ───────────────────────────────────────────────
// `run_turn_liveness` is raced against a "prompt" future the same way
// `run_prompt_task` does it: the prompt wins the select and the liveness
// future is dropped. We assert what the observer saw.

fn liveness_count(handle: &observer::ObserverHandle) -> usize {
handle
.snapshot()
.iter()
.filter(|e| e.kind == "turn_liveness")
.count()
}

#[tokio::test(start_paused = true)]
async fn test_liveness_fires_while_prompt_pends_then_stops() {
let observer = observer::ObserverHandle::in_process();
let context = observer::context_for(None, None, Some("t-1".into()));
let liveness = run_turn_liveness(
Some(observer.clone()),
Some(0),
context,
Duration::from_secs(10),
);
tokio::pin!(liveness);

// Prompt pends for 25s, then completes — first liveness tick at 10s,
// second at 20s, so the observer must see exactly two pings.
tokio::select! {
biased;
() = tokio::time::sleep(Duration::from_secs(25)) => {}
_ = &mut liveness => unreachable!("liveness future never resolves"),
}

assert_eq!(liveness_count(&observer), 2);

// The turn carried the live turn_id on each ping.
let pings: Vec<_> = observer
.snapshot()
.into_iter()
.filter(|e| e.kind == "turn_liveness")
.collect();
assert!(pings.iter().all(|e| e.turn_id.as_deref() == Some("t-1")));

// After the prompt wins the select, the liveness future is dropped —
// advancing the clock further produces no new pings.
tokio::time::sleep(Duration::from_secs(60)).await;
assert_eq!(liveness_count(&observer), 2);
}

#[tokio::test(start_paused = true)]
async fn test_liveness_disabled_when_interval_zero_emits_nothing() {
let observer = observer::ObserverHandle::in_process();
let context = observer::context_for(None, None, Some("t-1".into()));
let liveness = run_turn_liveness(Some(observer.clone()), Some(0), context, Duration::ZERO);
tokio::pin!(liveness);

tokio::select! {
biased;
() = tokio::time::sleep(Duration::from_secs(120)) => {}
_ = &mut liveness => unreachable!("disabled liveness future never resolves"),
}

assert_eq!(liveness_count(&observer), 0);
}

#[tokio::test(start_paused = true)]
async fn test_liveness_without_observer_emits_nothing() {
// A turn that never started has no observer handle — the future must
// park without emitting or panicking.
let context = observer::context_for(None, None, Some("t-1".into()));
let liveness = run_turn_liveness(None, None, context, Duration::from_secs(10));
tokio::pin!(liveness);

tokio::select! {
biased;
() = tokio::time::sleep(Duration::from_secs(120)) => {}
_ = &mut liveness => unreachable!("handle-less liveness future never resolves"),
}
// No observer to assert against — reaching here without panic is the test.
}
}
Loading