diff --git a/crates/buzz-acp/src/config.rs b/crates/buzz-acp/src/config.rs index bbbf7062c..45772bb84 100644 --- a/crates/buzz-acp/src/config.rs +++ b/crates/buzz-acp/src/config.rs @@ -250,6 +250,11 @@ pub struct CliArgs { #[arg(long, env = "BUZZ_ACP_HEARTBEAT_INTERVAL", default_value_t = 0)] pub heartbeat_interval: u64, + /// Seconds between per-turn liveness pings (the crash backstop signal — + /// distinct from heartbeat self-prompting). 0 = disabled. + #[arg(long, env = "BUZZ_ACP_TURN_LIVENESS_SECS", default_value_t = 10)] + pub turn_liveness_secs: u64, + /// Heartbeat prompt text. Conflicts with --heartbeat-prompt-file. #[arg( long, @@ -435,6 +440,10 @@ pub struct Config { pub max_turn_duration_secs: u64, pub agents: u32, pub heartbeat_interval_secs: u64, + /// Seconds between per-turn liveness pings. 0 = disabled. Distinct from + /// `heartbeat_interval_secs` (agent self-prompting) — this is the desktop + /// crash-backstop signal. + pub turn_liveness_secs: u64, pub heartbeat_prompt: Option, pub system_prompt: Option, pub initial_message: Option, @@ -599,6 +608,12 @@ impl Config { )); } + if args.turn_liveness_secs > 0 && args.turn_liveness_secs < 5 { + return Err(ConfigError::ConfigFile( + "turn liveness interval must be 0 (disabled) or ≥5 seconds".into(), + )); + } + let heartbeat_prompt = if let Some(text) = args.heartbeat_prompt { Some(text) } else if let Some(ref path) = args.heartbeat_prompt_file { @@ -669,6 +684,17 @@ impl Config { args.heartbeat_interval }; + // Cap turn-liveness interval at 86400s (24h) — same bound as heartbeat. + let turn_liveness_secs = if args.turn_liveness_secs > 86400 { + tracing::warn!( + interval = args.turn_liveness_secs, + "turn liveness interval exceeds 24h — capping at 86400s" + ); + 86400u64 + } else { + args.turn_liveness_secs + }; + // Resolve idle_timeout_secs with deprecation handling. // Precedence: explicit --idle-timeout > --turn-timeout (deprecated) > `DEFAULT_IDLE_TIMEOUT_SECS`. let idle_timeout_secs = { @@ -808,6 +834,7 @@ impl Config { max_turn_duration_secs, agents: args.agents, heartbeat_interval_secs: heartbeat_interval, + turn_liveness_secs, heartbeat_prompt, system_prompt, initial_message: args.initial_message, @@ -1173,6 +1200,7 @@ mod tests { max_turn_duration_secs: 3600, agents: 1, heartbeat_interval_secs: 0, + turn_liveness_secs: 10, heartbeat_prompt: None, system_prompt: None, initial_message: None, @@ -1742,6 +1770,44 @@ channels = "ALL" assert!(err.to_string().contains("heartbeat interval must be 0")); } + // ── turn-liveness validation ───────────────────────────────────────────── + + fn validate_turn_liveness(secs: u64) -> Result<(), ConfigError> { + if secs > 0 && secs < 5 { + return Err(ConfigError::ConfigFile( + "turn liveness interval must be 0 (disabled) or ≥5 seconds".into(), + )); + } + Ok(()) + } + + #[test] + fn test_turn_liveness_zero_ok() { + assert!(validate_turn_liveness(0).is_ok()); + } + + #[test] + fn test_turn_liveness_five_ok() { + assert!(validate_turn_liveness(5).is_ok()); + } + + #[test] + fn test_turn_liveness_ten_ok() { + assert!(validate_turn_liveness(10).is_ok()); + } + + #[test] + fn test_turn_liveness_four_rejected() { + let err = validate_turn_liveness(4).unwrap_err(); + assert!(err.to_string().contains("turn liveness interval must be 0")); + } + + #[test] + fn test_turn_liveness_one_rejected() { + let err = validate_turn_liveness(1).unwrap_err(); + assert!(err.to_string().contains("turn liveness interval must be 0")); + } + // ── summary includes agents and heartbeat ──────────────────────────────── #[test] diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index 7a0e7f3af..c87928a53 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -1078,6 +1078,7 @@ async fn tokio_main() -> Result<()> { initial_message: config.initial_message.clone(), idle_timeout: Duration::from_secs(config.idle_timeout_secs), max_turn_duration: Duration::from_secs(config.max_turn_duration_secs), + turn_liveness_interval: Duration::from_secs(config.turn_liveness_secs), dedup_mode: config.dedup_mode, system_prompt: config.system_prompt.clone(), base_prompt: if config.no_base_prompt { @@ -3025,6 +3026,7 @@ mod build_mcp_servers_tests { max_turn_duration_secs: 3600, agents: 1, heartbeat_interval_secs: 0, + turn_liveness_secs: 10, heartbeat_prompt: None, system_prompt: None, initial_message: None, diff --git a/crates/buzz-acp/src/pool.rs b/crates/buzz-acp/src/pool.rs index 543fd32ef..7f39639d0 100644 --- a/crates/buzz-acp/src/pool.rs +++ b/crates/buzz-acp/src/pool.rs @@ -216,6 +216,10 @@ pub struct PromptContext { pub initial_message: Option, pub idle_timeout: Duration, pub max_turn_duration: Duration, + /// Interval between per-turn `turn_liveness` observer pings. `Duration::ZERO` + /// disables emission. This is the desktop crash-backstop signal — distinct + /// from `heartbeat_prompt` (agent self-prompting). + pub turn_liveness_interval: Duration, pub dedup_mode: DedupMode, pub system_prompt: Option, pub heartbeat_prompt: Option, @@ -1088,18 +1092,36 @@ pub async fn run_prompt_task( // When control_rx is Some (channel tasks), wrap the prompt in select! so // the main loop can cancel, interrupt, or rotate it. Heartbeats // (control_rx=None) take the simple await path — they are not controllable. + // + // The liveness future emits `turn_liveness` pings on an interval and never + // resolves; it rides every prompt-await path as a non-winning select arm so + // a turn stays alive on the desktop while it runs. Built from a captured + // observer handle (not `&agent.acp`) because the prompt holds `&mut agent.acp`. + let liveness = run_turn_liveness( + agent.acp.observer_handle(), + agent.acp.observer_agent_index(), + observer::context_for( + observer_channel_id, + Some(session_id.clone()), + Some(turn_id.clone()), + ), + ctx.turn_liveness_interval, + ); + tokio::pin!(liveness); + let prompt_result = match control_rx { None => { // Heartbeat / non-cancellable path. - agent - .acp - .session_prompt_blocks_with_idle_timeout( + tokio::select! { + biased; + result = agent.acp.session_prompt_blocks_with_idle_timeout( &session_id, &prompt_blocks, ctx.idle_timeout, ctx.max_turn_duration, - ) - .await + ) => result, + _ = &mut liveness => unreachable!("liveness future never resolves"), + } } Some(rx) => { tokio::select! { @@ -1110,6 +1132,7 @@ pub async fn run_prompt_task( ctx.idle_timeout, ctx.max_turn_duration, ) => result, + _ = &mut liveness => unreachable!("liveness future never resolves"), mode = rx => { let control_signal = mode.unwrap_or(ControlSignal::Cancel); // Control signal received. Guard against Race 1: the turn may @@ -2017,6 +2040,49 @@ impl Drop for ReactionGuard { } } +// ── Turn liveness emission ─────────────────────────────────────────────────── +// Periodically emits a `turn_liveness` observer event while a turn is in-flight, +// so the desktop can prune turns whose host died without unwinding (kill -9 / +// crash) far sooner than the no-activity backstop. Runs as a non-resolving +// `select!` arm in `run_prompt_task`: it lives and dies with the prompt future, +// so emission stops on every exit path (complete / cancel / error / panic) with +// no separate teardown to forget. +// +// Takes a captured `ObserverHandle` rather than `&agent.acp` because the prompt +// future holds `&mut agent.acp` for its whole duration — a second borrow would +// not compile. +// +// This future never resolves; callers must race it against the prompt and rely +// on drop for teardown. When `interval` is zero, liveness is disabled and the +// future parks forever without emitting. +async fn run_turn_liveness( + observer: Option, + agent_index: Option, + context: observer::ObserverContext, + interval: Duration, +) { + let Some(observer) = observer else { + return std::future::pending::<()>().await; + }; + if interval.is_zero() { + return std::future::pending::<()>().await; + } + let mut ticker = tokio::time::interval(interval); + // The first tick completes immediately; skip it so the first liveness ping + // fires one interval after the turn starts, not at t=0 (turn_started already + // marks t=0). + ticker.tick().await; + loop { + ticker.tick().await; + observer.emit( + "turn_liveness", + agent_index, + &context, + serde_json::json!({}), + ); + } +} + // ── Turn completion scope guard ────────────────────────────────────────────── // Emits a `turn_completed` observer event on drop, covering ALL exit paths // (success, error, timeout, cancel, panic) from `run_prompt_task`. Captures @@ -2790,4 +2856,85 @@ mod tests { assert_eq!(*s.turn_counts.get(&ch_b).unwrap(), 3); assert_eq!(s.core_sections.get(&ch_b).unwrap(), "core-b"); } + + // ── turn liveness emission ─────────────────────────────────────────────── + // `run_turn_liveness` is raced against a "prompt" future the same way + // `run_prompt_task` does it: the prompt wins the select and the liveness + // future is dropped. We assert what the observer saw. + + fn liveness_count(handle: &observer::ObserverHandle) -> usize { + handle + .snapshot() + .iter() + .filter(|e| e.kind == "turn_liveness") + .count() + } + + #[tokio::test(start_paused = true)] + async fn test_liveness_fires_while_prompt_pends_then_stops() { + let observer = observer::ObserverHandle::in_process(); + let context = observer::context_for(None, None, Some("t-1".into())); + let liveness = run_turn_liveness( + Some(observer.clone()), + Some(0), + context, + Duration::from_secs(10), + ); + tokio::pin!(liveness); + + // Prompt pends for 25s, then completes — first liveness tick at 10s, + // second at 20s, so the observer must see exactly two pings. + tokio::select! { + biased; + () = tokio::time::sleep(Duration::from_secs(25)) => {} + _ = &mut liveness => unreachable!("liveness future never resolves"), + } + + assert_eq!(liveness_count(&observer), 2); + + // The turn carried the live turn_id on each ping. + let pings: Vec<_> = observer + .snapshot() + .into_iter() + .filter(|e| e.kind == "turn_liveness") + .collect(); + assert!(pings.iter().all(|e| e.turn_id.as_deref() == Some("t-1"))); + + // After the prompt wins the select, the liveness future is dropped — + // advancing the clock further produces no new pings. + tokio::time::sleep(Duration::from_secs(60)).await; + assert_eq!(liveness_count(&observer), 2); + } + + #[tokio::test(start_paused = true)] + async fn test_liveness_disabled_when_interval_zero_emits_nothing() { + let observer = observer::ObserverHandle::in_process(); + let context = observer::context_for(None, None, Some("t-1".into())); + let liveness = run_turn_liveness(Some(observer.clone()), Some(0), context, Duration::ZERO); + tokio::pin!(liveness); + + tokio::select! { + biased; + () = tokio::time::sleep(Duration::from_secs(120)) => {} + _ = &mut liveness => unreachable!("disabled liveness future never resolves"), + } + + assert_eq!(liveness_count(&observer), 0); + } + + #[tokio::test(start_paused = true)] + async fn test_liveness_without_observer_emits_nothing() { + // A turn that never started has no observer handle — the future must + // park without emitting or panicking. + let context = observer::context_for(None, None, Some("t-1".into())); + let liveness = run_turn_liveness(None, None, context, Duration::from_secs(10)); + tokio::pin!(liveness); + + tokio::select! { + biased; + () = tokio::time::sleep(Duration::from_secs(120)) => {} + _ = &mut liveness => unreachable!("handle-less liveness future never resolves"), + } + // No observer to assert against — reaching here without panic is the test. + } } diff --git a/desktop/src/features/agents/activeAgentTurnsStore.test.mjs b/desktop/src/features/agents/activeAgentTurnsStore.test.mjs index 8d0b7fde2..b597b2074 100644 --- a/desktop/src/features/agents/activeAgentTurnsStore.test.mjs +++ b/desktop/src/features/agents/activeAgentTurnsStore.test.mjs @@ -1,16 +1,22 @@ import assert from "node:assert/strict"; -import { describe, it, beforeEach } from "node:test"; +import { describe, it, beforeEach, afterEach, mock } from "node:test"; import { syncAgentTurnsFromEvents, - getActiveChannelsForAgent, + getActiveTurnsForAgent, resetActiveAgentTurnsStore, subscribeActiveAgentTurns, } from "./activeAgentTurnsStore.ts"; +import { formatElapsed } from "./ui/agentSessionUtils.ts"; const AGENT = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234"; +/** Channel-id Set view of the summary array — keeps legacy assertions terse. */ +function channelIdsOf(turns) { + return new Set(turns.map((t) => t.channelId)); +} + function makeEvent(overrides) { return { seq: 1, @@ -35,7 +41,7 @@ describe("activeAgentTurnsStore", () => { syncAgentTurnsFromEvents(AGENT, [ makeEvent({ seq: 1, turnId: "t1", channelId: "c1" }), ]); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); assert.equal(channels.size, 1); assert.ok(channels.has("c1")); }); @@ -48,7 +54,7 @@ describe("activeAgentTurnsStore", () => { syncAgentTurnsFromEvents(AGENT, [ makeEvent({ seq: 3, turnId: "t2", channelId: "c2" }), ]); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); assert.equal(channels.size, 1); assert.ok(channels.has("c1")); assert.ok(!channels.has("c2")); @@ -61,7 +67,7 @@ describe("activeAgentTurnsStore", () => { syncAgentTurnsFromEvents(AGENT, [ makeEvent({ seq: 1, turnId: "t2", channelId: "c2" }), ]); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); assert.equal(channels.size, 1); assert.ok(channels.has("c1")); }); @@ -78,7 +84,7 @@ describe("activeAgentTurnsStore", () => { timestamp: "2024-01-01T00:00:00Z", }), ]); - assert.equal(getActiveChannelsForAgent(AGENT).size, 1); + assert.equal(getActiveTurnsForAgent(AGENT).length, 1); // Agent restarts — seq resets to 1, but wall-clock timestamp keeps // climbing. The composite watermark accepts it on timestamp alone. @@ -90,7 +96,7 @@ describe("activeAgentTurnsStore", () => { timestamp: "2024-01-01T00:01:00Z", }), ]); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); assert.ok(channels.has("c2"), "post-restart event should be processed"); }); @@ -126,7 +132,7 @@ describe("activeAgentTurnsStore", () => { timestamp: "2024-01-01T00:01:02Z", }), ]); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); // t1 still active (not ended), t2 ended, t3 still active. assert.ok(channels.has("c1")); assert.ok(!channels.has("c2")); @@ -148,7 +154,7 @@ describe("activeAgentTurnsStore", () => { ); } syncAgentTurnsFromEvents(AGENT, events); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); // Should have evicted c1 (oldest) to make room for c5 assert.equal(channels.size, 4); assert.ok(!channels.has("c1"), "oldest turn should be evicted"); @@ -168,7 +174,7 @@ describe("activeAgentTurnsStore", () => { channelId: null, }), ]); - assert.equal(getActiveChannelsForAgent(AGENT).size, 0); + assert.equal(getActiveTurnsForAgent(AGENT).length, 0); }); it("falls back to channelId when turnId is null", () => { @@ -181,7 +187,7 @@ describe("activeAgentTurnsStore", () => { channelId: "c1", }), ]); - assert.equal(getActiveChannelsForAgent(AGENT).size, 0); + assert.equal(getActiveTurnsForAgent(AGENT).length, 0); }); it("does nothing when both turnId and channelId are null", () => { @@ -195,7 +201,7 @@ describe("activeAgentTurnsStore", () => { }), ]); // Turn should still be active — no way to identify which to end - assert.equal(getActiveChannelsForAgent(AGENT).size, 1); + assert.equal(getActiveTurnsForAgent(AGENT).length, 1); }); it("channelId fallback removes only one matching turn", () => { @@ -210,7 +216,7 @@ describe("activeAgentTurnsStore", () => { }), ]); // Only one of the two turns in c1 should be removed - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); assert.equal(channels.size, 1); assert.ok(channels.has("c1")); }); @@ -264,8 +270,8 @@ describe("activeAgentTurnsStore", () => { // Initial pass. syncAgentTurnsFromEvents(AGENT, buffer); - const afterFirst = getActiveChannelsForAgent(AGENT); - assert.equal(afterFirst.size, 2); + const afterFirst = getActiveTurnsForAgent(AGENT); + assert.equal(afterFirst.length, 2); // Subscribe, then replay the identical buffer. let notified = 0; @@ -276,7 +282,7 @@ describe("activeAgentTurnsStore", () => { unsub(); assert.equal(notified, 0, "replay must not notify listeners"); - const afterReplay = getActiveChannelsForAgent(AGENT); + const afterReplay = getActiveTurnsForAgent(AGENT); assert.equal( afterReplay, afterFirst, @@ -301,7 +307,7 @@ describe("activeAgentTurnsStore", () => { timestamp: "2024-01-01T00:00:01Z", }), ]); - assert.equal(getActiveChannelsForAgent(AGENT).size, 0); + assert.equal(getActiveTurnsForAgent(AGENT).length, 0); // Agent restarts. The harness replays its buffer with seq reset to 1, // but the original event timestamps (older than the watermark) are @@ -322,7 +328,7 @@ describe("activeAgentTurnsStore", () => { }), ]); assert.equal( - getActiveChannelsForAgent(AGENT).size, + getActiveTurnsForAgent(AGENT).length, 0, "stale replayed start must not resurrect an evicted turn", ); @@ -354,7 +360,7 @@ describe("activeAgentTurnsStore", () => { timestamp: "2024-01-01T00:00:02Z", }), ]); - assert.equal(getActiveChannelsForAgent(AGENT).size, 1); + assert.equal(getActiveTurnsForAgent(AGENT).length, 1); // The full buffer is replayed on the next observer event. The stale // turn_error (below the watermark) must NOT re-run its channel-match @@ -380,7 +386,7 @@ describe("activeAgentTurnsStore", () => { timestamp: "2024-01-01T00:00:02Z", }), ]); - const channels = getActiveChannelsForAgent(AGENT); + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); assert.equal( channels.size, 1, @@ -429,19 +435,239 @@ describe("activeAgentTurnsStore", () => { }); }); - describe("getActiveChannelsForAgent", () => { - it("returns EMPTY_SET for null/undefined pubkey", () => { - assert.equal(getActiveChannelsForAgent(null).size, 0); - assert.equal(getActiveChannelsForAgent(undefined).size, 0); + describe("getActiveTurnsForAgent", () => { + it("returns empty array for null/undefined pubkey", () => { + assert.equal(getActiveTurnsForAgent(null).length, 0); + assert.equal(getActiveTurnsForAgent(undefined).length, 0); }); it("returns stable reference when unchanged", () => { syncAgentTurnsFromEvents(AGENT, [ makeEvent({ seq: 1, turnId: "t1", channelId: "c1" }), ]); - const ref1 = getActiveChannelsForAgent(AGENT); - const ref2 = getActiveChannelsForAgent(AGENT); - assert.equal(ref1, ref2, "should return cached reference"); + const ref1 = getActiveTurnsForAgent(AGENT); + const ref2 = getActiveTurnsForAgent(AGENT); + assert.equal(ref1, ref2, "should return cached array reference"); + }); + + it("preserves a desktop-clock observedAt per channel", () => { + const before = Date.now(); + syncAgentTurnsFromEvents(AGENT, [ + // startedAt comes from the (stale) event timestamp; observedAt must + // instead anchor to the local clock at insert time. + makeEvent({ + seq: 1, + turnId: "t1", + channelId: "c1", + timestamp: "2000-01-01T00:00:00Z", + }), + ]); + const after = Date.now(); + const [summary] = getActiveTurnsForAgent(AGENT); + assert.equal(summary.channelId, "c1"); + assert.ok( + summary.observedAt >= before && summary.observedAt <= after, + "observedAt must be the local clock at insert, not the event timestamp", + ); + }); + + it("collapses two turns in one channel to the earliest observedAt", () => { + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 1, turnId: "t1", channelId: "c1" }), + ]); + const firstObservedAt = getActiveTurnsForAgent(AGENT)[0].observedAt; + + // Second turn in the same channel — its observedAt is >= the first + // because the clock is monotonic, so the earliest must still win. + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 2, turnId: "t2", channelId: "c1" }), + ]); + const summaries = getActiveTurnsForAgent(AGENT); + assert.equal(summaries.length, 1, "same channel collapses to one entry"); + assert.equal( + summaries[0].observedAt, + firstObservedAt, + "earliest observedAt for the channel must be surfaced", + ); + }); + + it("advances to the surviving turn's observedAt after the earliest ends", () => { + // Two turns in one channel; the array must be rebuilt from the LIVE map + // on every mutation, so ending the earliest-observed turn must surface + // the survivor's observedAt — not a stale cached minimum. + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 1, turnId: "t-early", channelId: "c1" }), + ]); + const tEarly = getActiveTurnsForAgent(AGENT)[0].observedAt; + + // Force the second turn's observedAt strictly past the first so the + // advance is observable even when Date.now() would otherwise collide. + const spinUntil = Date.now() + 2; + while (Date.now() < spinUntil) { + /* busy-wait one clock tick */ + } + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 2, turnId: "t-later", channelId: "c1" }), + ]); + assert.equal( + getActiveTurnsForAgent(AGENT)[0].observedAt, + tEarly, + "earliest wins while both turns survive", + ); + + // End the earliest turn by its turnId. + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ + seq: 3, + kind: "turn_completed", + turnId: "t-early", + channelId: "c1", + }), + ]); + const [survivor] = getActiveTurnsForAgent(AGENT); + assert.equal(survivor.channelId, "c1"); + assert.ok( + survivor.observedAt > tEarly, + "surfaced observedAt must advance to the surviving turn after eviction", + ); + }); + + it("sorts summaries by channelId", () => { + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 1, turnId: "t1", channelId: "c-zebra" }), + makeEvent({ seq: 2, turnId: "t2", channelId: "c-alpha" }), + ]); + const ids = getActiveTurnsForAgent(AGENT).map((s) => s.channelId); + assert.deepEqual(ids, ["c-alpha", "c-zebra"]); + }); + }); + + describe("turn_liveness prune backstop", () => { + // The prune sweep runs on an internal setInterval keyed off Date.now(); + // faking both lets us drive the 25s bound deterministically. The fixed + // epoch is the clock floor — event timestamps below anchor lastActivityAt + // to it, so elapsed time is exactly what mock.timers.tick advances. + const EPOCH = Date.parse("2024-01-01T00:00:00Z"); + const at = (ms) => new Date(EPOCH + ms).toISOString(); + // Mirrors the store's REMOVE_AFTER_MS (LIVENESS_INTERVAL_MS * 2.5) and + // PRUNE_INTERVAL_MS. Not exported — kept in lockstep here so the prune + // bound stays asserted from the consumer's perspective. + const REMOVE_AFTER_MS = 25_000; + const PRUNE_INTERVAL_MS = 5_000; + + let unsubscribe; + + beforeEach(() => { + mock.timers.enable({ apis: ["setInterval", "Date"], now: EPOCH }); + // Subscribing starts the prune interval under the faked clock. + unsubscribe = subscribeActiveAgentTurns(() => {}); }); + + afterEach(() => { + unsubscribe(); + mock.timers.reset(); + }); + + it("keeps a turn alive when turn_liveness refreshes before the bound", () => { + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 1, turnId: "t1", channelId: "c1", timestamp: at(0) }), + ]); + + // Refresh activity at 20s — under the 25s bound — then advance to 40s. + // Without the refresh the turn would have been pruned by 25s; the + // liveness ping resets lastActivityAt so it survives. + mock.timers.tick(20_000); + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ + seq: 2, + kind: "turn_liveness", + turnId: "t1", + channelId: "c1", + timestamp: at(20_000), + }), + ]); + mock.timers.tick(20_000); + + const channels = channelIdsOf(getActiveTurnsForAgent(AGENT)); + assert.ok( + channels.has("c1"), + "liveness within the bound must defer the prune", + ); + }); + + it("prunes a turn that receives no activity past the bound", () => { + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 1, turnId: "t1", channelId: "c1", timestamp: at(0) }), + ]); + assert.equal(getActiveTurnsForAgent(AGENT).length, 1); + + // No liveness pings — the host died without unwinding. Advance past the + // 25s bound; the next prune sweep evicts the silent turn. + mock.timers.tick(REMOVE_AFTER_MS + PRUNE_INTERVAL_MS); + + assert.equal( + getActiveTurnsForAgent(AGENT).length, + 0, + "a turn with no activity past the bound must be pruned", + ); + }); + + it("treats a turn_liveness with a null turnId as a no-op", () => { + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ seq: 1, turnId: "t1", channelId: "c1", timestamp: at(0) }), + ]); + + // A liveness ping with no turnId must refresh nothing (recordActivity + // no-ops on null). If it wrongly refreshed, the turn would survive the + // bound below — so the prune is the observable proof of the no-op, and + // the missing turnId must not throw. + mock.timers.tick(20_000); + assert.doesNotThrow(() => { + syncAgentTurnsFromEvents(AGENT, [ + makeEvent({ + seq: 2, + kind: "turn_liveness", + turnId: null, + channelId: "c1", + timestamp: at(20_000), + }), + ]); + }); + mock.timers.tick(REMOVE_AFTER_MS + PRUNE_INTERVAL_MS); + + assert.equal( + getActiveTurnsForAgent(AGENT).length, + 0, + "a null-turnId liveness must not refresh activity, so the turn still prunes", + ); + }); + }); +}); + +describe("formatElapsed", () => { + it("renders sub-10s as whole seconds", () => { + assert.equal(formatElapsed(0), "0s"); + assert.equal(formatElapsed(4_900), "4s"); + }); + + it("renders sub-minute as whole seconds", () => { + assert.equal(formatElapsed(59_000), "59s"); + }); + + it("rolls into minutes at exactly 60s", () => { + assert.equal(formatElapsed(60_000), "1m 0s"); + }); + + it("renders minutes and seconds", () => { + assert.equal(formatElapsed(83_000), "1m 23s"); + }); + + it("rolls 59m 59s cleanly into 1h 0m 0s at 3600s", () => { + assert.equal(formatElapsed(3_599_000), "59m 59s"); + assert.equal(formatElapsed(3_600_000), "1h 0m 0s"); + }); + + it("clamps negative input to 0s", () => { + assert.equal(formatElapsed(-5_000), "0s"); }); }); diff --git a/desktop/src/features/agents/activeAgentTurnsStore.ts b/desktop/src/features/agents/activeAgentTurnsStore.ts index 8c5c1b3ae..14bfc4e61 100644 --- a/desktop/src/features/agents/activeAgentTurnsStore.ts +++ b/desktop/src/features/agents/activeAgentTurnsStore.ts @@ -8,8 +8,14 @@ import { import { normalizePubkey } from "@/shared/lib/pubkey"; import type { ObserverEvent } from "./ui/agentSessionTypes"; -/** Remove a turn entirely after 90s of no activity. */ -const REMOVE_AFTER_MS = 90_000; +/** Harness emits turn_liveness every ~10s (BUZZ_ACP_TURN_LIVENESS_SECS). */ +const LIVENESS_INTERVAL_MS = 10_000; +/** Remove a turn after this long with no activity. Tolerates one fully dropped + * liveness ping plus slack before pruning a turn whose host died without + * unwinding (kill -9 / crash) — the only case that reaches this bound, since + * graceful exits clear via turn_completed and working turns refresh on every + * stream event. Derived from the interval so it tracks if the interval changes. */ +const REMOVE_AFTER_MS = LIVENESS_INTERVAL_MS * 2.5; /** Maximum concurrent active turns tracked per agent (matches pool size). */ const MAX_TURNS_PER_AGENT = 4; /** Interval for pruning stale/expired turns. */ @@ -19,16 +25,23 @@ type ActiveTurn = { turnId: string; channelId: string; startedAt: number; + observedAt: number; lastActivityAt: number; }; +/** One working channel surfaced to the UI, anchored to the desktop clock. */ +export type ActiveTurnSummary = { + channelId: string; + observedAt: number; +}; + // Module-level state: agentPubkey → turnId → ActiveTurn const activeTurnsByAgent = new Map>(); const listeners = new Set<() => void>(); // Cached snapshots for useSyncExternalStore reference stability. // Only regenerated when the underlying turn map for an agent actually changes. -const cachedChannelSets = new Map>(); +const cachedTurnSummaries = new Map(); // Composite watermark per agent: the newest observer event processed, by // (timestamp, seq) ordering. An event is processed only if it is strictly @@ -39,7 +52,7 @@ const lastProcessed = new Map(); let pruneInterval: ReturnType | null = null; function invalidateCache(agentKey: string) { - cachedChannelSets.delete(agentKey); + cachedTurnSummaries.delete(agentKey); } function notifyListeners() { @@ -81,6 +94,9 @@ function startTurn( turnId, channelId, startedAt: now, + // Desktop-clock anchor for the live elapsed counter. Must NOT use startedAt + // (agent-host clock) — ticking the desktop clock against it skews remote agents. + observedAt: Date.now(), lastActivityAt: now, }); invalidateCache(key); @@ -183,6 +199,10 @@ function processEvent(agentPubkey: string, event: ObserverEvent) { break; case "acp_read": case "acp_write": + // turn_liveness keeps a quiet-but-alive turn from being pruned; same + // refresh-only path as stream activity — no surfaced summary change, so + // no notifyListeners(). + case "turn_liveness": recordActivity(agentPubkey, event.turnId ?? null); break; } @@ -215,24 +235,41 @@ export function subscribeActiveAgentTurns(listener: () => void) { }; } -/** Returns the set of channel IDs where the given agent has active turns. */ -export function getActiveChannelsForAgent( +/** + * Returns the channels where the given agent has active turns, sorted by + * channelId, each anchored to the earliest `observedAt` for that channel. + * The array reference is cached and stable until the turn map mutates — a + * requirement for `useSyncExternalStore`. + */ +export function getActiveTurnsForAgent( agentPubkey: string | null | undefined, -): Set { - if (!agentPubkey) return EMPTY_SET; +): ActiveTurnSummary[] { + if (!agentPubkey) return EMPTY_TURNS; const key = normalizePubkey(agentPubkey); const agentTurns = activeTurnsByAgent.get(key); - if (!agentTurns || agentTurns.size === 0) return EMPTY_SET; + if (!agentTurns || agentTurns.size === 0) return EMPTY_TURNS; - const cached = cachedChannelSets.get(key); + const cached = cachedTurnSummaries.get(key); if (cached) return cached; - const result = new Set([...agentTurns.values()].map((t) => t.channelId)); - cachedChannelSets.set(key, result); + // Collapse multiple turns in one channel to the earliest observation — + // the badge should count from when the channel first went active. + const earliestByChannel = new Map(); + for (const turn of agentTurns.values()) { + const prior = earliestByChannel.get(turn.channelId); + if (prior === undefined || turn.observedAt < prior) { + earliestByChannel.set(turn.channelId, turn.observedAt); + } + } + + const result = [...earliestByChannel.entries()] + .map(([channelId, observedAt]) => ({ channelId, observedAt })) + .sort((a, b) => a.channelId.localeCompare(b.channelId)); + cachedTurnSummaries.set(key, result); return result; } -const EMPTY_SET: Set = new Set(); +const EMPTY_TURNS: ActiveTurnSummary[] = []; /** * Synchronize the active-turns store with the latest observer events for a @@ -248,14 +285,15 @@ export function syncAgentTurnsFromEvents( } /** - * Hook: returns the set of channel IDs where the given agent is currently working. - * Re-renders when the set changes. + * Hook: returns the channels where the given agent is currently working, each + * with the desktop-clock `observedAt` to anchor a live elapsed counter. + * Re-renders when the set of channels changes — not when the clock ticks. */ export function useActiveAgentTurns( agentPubkey: string | null | undefined, -): Set { +): ActiveTurnSummary[] { const getSnapshot = React.useCallback( - () => getActiveChannelsForAgent(agentPubkey), + () => getActiveTurnsForAgent(agentPubkey), [agentPubkey], ); @@ -286,6 +324,6 @@ export function useActiveAgentTurnsBridge( export function resetActiveAgentTurnsStore() { activeTurnsByAgent.clear(); lastProcessed.clear(); - cachedChannelSets.clear(); + cachedTurnSummaries.clear(); notifyListeners(); } diff --git a/desktop/src/features/agents/ui/ManagedAgentRow.tsx b/desktop/src/features/agents/ui/ManagedAgentRow.tsx index 66e9614af..3c7310658 100644 --- a/desktop/src/features/agents/ui/ManagedAgentRow.tsx +++ b/desktop/src/features/agents/ui/ManagedAgentRow.tsx @@ -20,6 +20,8 @@ import { PresenceDot } from "@/features/presence/ui/PresenceBadge"; import { Badge } from "@/shared/ui/badge"; import { AgentStatusBadge } from "@/features/agents/ui/AgentStatusBadge"; import { useActiveAgentTurns } from "@/features/agents/activeAgentTurnsStore"; +import { formatElapsed } from "@/features/agents/ui/agentSessionUtils"; +import { useNow } from "@/shared/lib/useNow"; import type { ManagedAgent, PresenceLookup, @@ -84,13 +86,17 @@ export function ManagedAgentRow({ ? (personaLabelsById[agent.personaId] ?? null) : null; const presenceStatus = presenceLookup[agent.pubkey.trim().toLowerCase()]; - const activeChannelIds = useActiveAgentTurns(agent.pubkey); + const activeTurns = useActiveAgentTurns(agent.pubkey); const activeWorkingChannels = React.useMemo( () => - [...activeChannelIds] - .map((id) => ({ id, name: channelIdToName[id] ?? id })) + activeTurns + .map(({ channelId, observedAt }) => ({ + id: channelId, + name: channelIdToName[channelId] ?? channelId, + observedAt, + })) .slice(0, 3), - [activeChannelIds, channelIdToName], + [activeTurns, channelIdToName], ); const isWorking = activeWorkingChannels.length > 0; const processDetail = @@ -216,7 +222,7 @@ function AgentSummary({ personaLabel, presenceStatus, }: { - activeWorkingChannels: { id: string; name: string }[]; + activeWorkingChannels: { id: string; name: string; observedAt: number }[]; agent: ManagedAgent; channelNames: { id: string; name: string }[]; isExpandable: boolean; @@ -281,17 +287,13 @@ function AgentSummary({ {activeWorkingChannels.length > 0 ? (
{activeWorkingChannels.map((channel) => ( - { - e.stopPropagation(); - void goChannel(channel.id); - }} - > - Working in #{channel.name} - + channelId={channel.id} + name={channel.name} + observedAt={channel.observedAt} + onNavigate={goChannel} + /> ))}
) : null} @@ -301,6 +303,35 @@ function AgentSummary({ ); } +function WorkingBadge({ + channelId, + name, + observedAt, + onNavigate, +}: { + channelId: string; + name: string; + observedAt: number; + onNavigate: (channelId: string) => void; +}) { + // The 1s tick lives here, at the leaf, so only visible working badges + // re-render each second — idle rows never mount this hook. + const now = useNow(1000); + + return ( + { + e.stopPropagation(); + onNavigate(channelId); + }} + > + Working in #{name} · {formatElapsed(now - observedAt)} + + ); +} + function StatusBlock({ friendlyError, isWorking, diff --git a/desktop/src/features/agents/ui/agentSessionUtils.ts b/desktop/src/features/agents/ui/agentSessionUtils.ts index ae3a147ab..19d915ec8 100644 --- a/desktop/src/features/agents/ui/agentSessionUtils.ts +++ b/desktop/src/features/agents/ui/agentSessionUtils.ts @@ -131,3 +131,19 @@ export function formatDuration( } return seconds > 0 ? `${minutes}m ${seconds}s` : `${minutes}m`; } + +/** + * Format a live elapsed duration (epoch-ms delta) for a ticking counter. + * Tiers: `<60s → "Ns"` · `<60m → "Nm Ns"` · `≥60m → "Nh Nm Ns"`. + * Negative input clamps to 0; carries roll cleanly (e.g. 3600s → "1h 0m 0s"). + */ +export function formatElapsed(ms: number): string { + const totalSeconds = Math.max(0, Math.floor(ms / 1000)); + if (totalSeconds < 60) return `${totalSeconds}s`; + const seconds = totalSeconds % 60; + const totalMinutes = Math.floor(totalSeconds / 60); + if (totalMinutes < 60) return `${totalMinutes}m ${seconds}s`; + const minutes = totalMinutes % 60; + const hours = Math.floor(totalMinutes / 60); + return `${hours}h ${minutes}m ${seconds}s`; +} diff --git a/desktop/src/shared/lib/useNow.ts b/desktop/src/shared/lib/useNow.ts new file mode 100644 index 000000000..f0f651891 --- /dev/null +++ b/desktop/src/shared/lib/useNow.ts @@ -0,0 +1,17 @@ +import * as React from "react"; + +/** + * Returns `Date.now()`, re-rendering the calling component every `intervalMs`. + * Each consumer owns one `setInterval` cleaned up on unmount — mount the hook + * only where a live clock is actually displayed so idle components never tick. + */ +export function useNow(intervalMs: number): number { + const [now, setNow] = React.useState(() => Date.now()); + + React.useEffect(() => { + const id = setInterval(() => setNow(Date.now()), intervalMs); + return () => clearInterval(id); + }, [intervalMs]); + + return now; +}