diff --git a/scripts/check-file-sizes.mjs b/scripts/check-file-sizes.mjs index cf4ba8c..1a980ce 100644 --- a/scripts/check-file-sizes.mjs +++ b/scripts/check-file-sizes.mjs @@ -51,9 +51,9 @@ const EXCEPTIONS = { "ACP-backed session overlay persistence, draft migration, and sidebar-facing session merge logic live together for now.", }, "src-tauri/src/services/acp/manager/dispatcher.rs": { - limit: 520, + limit: 540, justification: - "ACP replay and live-stream event fan-out share one dispatcher until session event routing is split.", + "ACP replay and live-stream event fan-out share one dispatcher with replay event counting for drain stabilisation.", }, "src-tauri/src/services/acp/manager.rs": { limit: 630, @@ -61,9 +61,9 @@ const EXCEPTIONS = { "ACP manager command dispatch loop — export/import/fork session ext_method dispatch adds boilerplate.", }, "src-tauri/src/services/acp/manager/session_ops.rs": { - limit: 570, + limit: 620, justification: - "Session prepare/load/list logic, working-dir updates, and composite prepared-session reuse remain colocated while ACP session ownership stabilizes.", + "Session prepare/load/list logic, working-dir updates, wait_for_replay_drain helper with iteration cap, and composite prepared-session reuse remain colocated while ACP session ownership stabilizes.", }, }; diff --git a/src-tauri/src/services/acp/manager/dispatcher.rs b/src-tauri/src/services/acp/manager/dispatcher.rs index 0e6155f..047638a 100644 --- a/src-tauri/src/services/acp/manager/dispatcher.rs +++ b/src-tauri/src/services/acp/manager/dispatcher.rs @@ -58,6 +58,7 @@ pub(super) struct SessionRoute { pub(super) writer: Option>, pub(super) canceled: bool, pub(super) replay_message_id: Option, + pub(super) replay_events: u32, } pub(super) struct SessionEventDispatcher { @@ -94,6 +95,7 @@ impl SessionEventDispatcher { writer: None, canceled: false, replay_message_id: None, + replay_events: 0, }); let _ = self.app_handle.emit( @@ -121,6 +123,7 @@ impl SessionEventDispatcher { writer: Some(writer), canceled: false, replay_message_id: None, + replay_events: 0, }, ); } @@ -254,6 +257,14 @@ impl SessionEventDispatcher { }, ); } + + pub(super) async fn get_replay_event_count(&self, goose_session_id: &str) -> u32 { + let routes = self.routes.lock().await; + routes + .get(goose_session_id) + .map(|r| r.replay_events) + .unwrap_or(0) + } } fn extract_content_preview(content: &[agent_client_protocol::ToolCallContent]) -> Option { @@ -381,7 +392,7 @@ impl Client for SessionEventDispatcher { match ¬ification.update { SessionUpdate::UserMessageChunk(chunk) => { if let AcpContentBlock::Text(text) = &chunk.content { - // Finalize any in-progress assistant message first + // Finalize any in-progress assistant message and count replay event { let mut routes = self.routes.lock().await; if let Some(route) = routes.get_mut(&goose_session_id) { @@ -394,6 +405,7 @@ impl Client for SessionEventDispatcher { }, ); } + route.replay_events += 1; } } @@ -436,6 +448,7 @@ impl Client for SessionEventDispatcher { let mut routes = self.routes.lock().await; if let Some(route) = routes.get_mut(&goose_session_id) { route.replay_message_id = Some(new_id.clone()); + route.replay_events += 1; } new_id }; diff --git a/src-tauri/src/services/acp/manager/session_ops.rs b/src-tauri/src/services/acp/manager/session_ops.rs index 1be791f..cb00a7f 100644 --- a/src-tauri/src/services/acp/manager/session_ops.rs +++ b/src-tauri/src/services/acp/manager/session_ops.rs @@ -439,8 +439,16 @@ pub(super) async fn load_session_inner( .await .map_err(|error| format!("Failed to load Goose session: {error:?}"))?; + // The ACP RPC layer resolves responses synchronously but dispatches + // notifications asynchronously via spawned tasks. After load_session + // returns, replay notifications may still be queued. Yield repeatedly + // to let the single-threaded runtime drain them before counting. + wait_for_replay_drain(|| async { dispatcher.get_replay_event_count(goose_session_id).await }) + .await; + // Finalize any in-progress replay assistant message dispatcher.finalize_replay(goose_session_id).await; + dispatcher.emit_replay_complete(local_session_id); if let Some(models) = &response.models { @@ -558,3 +566,46 @@ pub(super) async fn set_model_inner( Ok(()) } + +/// Yield repeatedly until an async counter stabilises for 3 consecutive rounds. +/// +/// After `load_session` returns, the ACP RPC layer may still have spawned +/// notification tasks that haven't run yet. This function yields to the +/// runtime between polls so those tasks get a chance to execute, and only +/// returns once the count has been stable for 3 consecutive yields — giving +/// us confidence that all replay events have been dispatched. +/// +/// A safety cap of 100 iterations prevents infinite spinning if a bug causes +/// the counter to increment indefinitely. +const MAX_DRAIN_ITERATIONS: u32 = 100; + +async fn wait_for_replay_drain(mut get_count: F) -> u32 +where + F: FnMut() -> Fut, + Fut: std::future::Future, +{ + let mut prev_total = 0u32; + let mut stable_rounds = 0u8; + let mut iterations = 0u32; + loop { + tokio::task::yield_now().await; + let total = get_count().await; + iterations += 1; + if total == prev_total { + stable_rounds += 1; + if stable_rounds >= 3 { + return total; + } + } else { + stable_rounds = 0; + prev_total = total; + } + if iterations >= MAX_DRAIN_ITERATIONS { + log::warn!( + "wait_for_replay_drain hit iteration cap ({MAX_DRAIN_ITERATIONS}); \ + returning partial count {total}" + ); + return total; + } + } +} diff --git a/src-tauri/src/services/acp/manager/session_ops/tests.rs b/src-tauri/src/services/acp/manager/session_ops/tests.rs index 4f5e51e..10e9885 100644 --- a/src-tauri/src/services/acp/manager/session_ops/tests.rs +++ b/src-tauri/src/services/acp/manager/session_ops/tests.rs @@ -1,10 +1,11 @@ use std::collections::{HashMap, HashSet}; - use std::path::PathBuf; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use super::{ - needs_provider_update, prepared_session_for_key, register_prepared_session_keys, ManagerState, - PreparedSession, + needs_provider_update, prepared_session_for_key, register_prepared_session_keys, + wait_for_replay_drain, ManagerState, PreparedSession, MAX_DRAIN_ITERATIONS, }; use crate::services::acp::split_composite_key; @@ -81,3 +82,96 @@ fn register_prepared_session_keys_preserves_composite_and_local_entries() { "goose-1" ); } + +#[tokio::test] +async fn replay_drain_returns_immediately_when_count_is_zero() { + let final_count = wait_for_replay_drain(|| async { 0u32 }).await; + assert_eq!(final_count, 0); +} + +#[tokio::test] +async fn replay_drain_returns_stable_count() { + let counter = Arc::new(AtomicU32::new(42)); + let c = counter.clone(); + let final_count = wait_for_replay_drain(|| { + let c = c.clone(); + async move { c.load(Ordering::SeqCst) } + }) + .await; + assert_eq!(final_count, 42); +} + +#[tokio::test] +async fn replay_drain_waits_for_spawned_notifications() { + let counter = Arc::new(AtomicU32::new(0)); + let c = counter.clone(); + + // Simulate async notifications arriving over multiple yields, like + // the real ACP RPC layer does after load_session returns. + tokio::spawn(async move { + for i in 1..=5 { + tokio::task::yield_now().await; + c.store(i, Ordering::SeqCst); + } + }); + + let c2 = counter.clone(); + let final_count = wait_for_replay_drain(|| { + let c = c2.clone(); + async move { c.load(Ordering::SeqCst) } + }) + .await; + + assert_eq!(final_count, 5); +} + +#[tokio::test] +async fn replay_drain_resets_stability_on_late_arrival() { + // Simulate: counter jumps to 3, stabilises for 2 rounds, then a late + // notification bumps it to 4. The drain must NOT stop at 3. + let poll_count = Arc::new(AtomicU32::new(0)); + + let pc = poll_count.clone(); + let final_count = wait_for_replay_drain(|| { + let pc = pc.clone(); + async move { + let poll = pc.fetch_add(1, Ordering::SeqCst); + // Polls 0..2 return 3 (2 stable rounds), then poll 3 bumps + // to 4 — simulating a late notification just before the drain + // would have declared stability. The drain must reset and + // wait for 4 to stabilise. + if poll < 3 { + 3 + } else { + 4 + } + } + }) + .await; + + // Must see the late arrival, not stop at 3 + assert_eq!(final_count, 4); + // Verify the stability window truly reset: 3 polls to see 3, 1 poll + // to see the bump to 4, then 3 more polls for 4 to stabilise = 7 min. + assert!( + poll_count.load(Ordering::SeqCst) >= 7, + "expected at least 7 polls to confirm stability window reset, got {}", + poll_count.load(Ordering::SeqCst) + ); +} + +#[tokio::test] +async fn replay_drain_caps_iterations_on_runaway_counter() { + // Simulate a counter that never stabilises — increments every poll. + let poll_count = Arc::new(AtomicU32::new(0)); + let pc = poll_count.clone(); + let final_count = wait_for_replay_drain(|| { + let pc = pc.clone(); + async move { pc.fetch_add(1, Ordering::SeqCst) + 1 } + }) + .await; + + // Should have stopped at the cap rather than spinning forever. + assert_eq!(final_count, MAX_DRAIN_ITERATIONS); + assert_eq!(poll_count.load(Ordering::SeqCst), MAX_DRAIN_ITERATIONS); +} diff --git a/src/app/AppShell.tsx b/src/app/AppShell.tsx index 386e4b3..f701b04 100644 --- a/src/app/AppShell.tsx +++ b/src/app/AppShell.tsx @@ -105,9 +105,6 @@ export function AppShell({ children }: { children?: React.ReactNode }) { } catch (err) { console.error("Failed to load session messages:", err); useChatStore.getState().setSessionLoading(sessionId, false); - console.log( - `[perf:load] ${sessionId.slice(0, 8)} failed, loading=false at +${(performance.now() - t0).toFixed(1)}ms`, - ); } }, []); diff --git a/src/features/chat/hooks/useAcpStream.ts b/src/features/chat/hooks/useAcpStream.ts index 1abea39..c70ee68 100644 --- a/src/features/chat/hooks/useAcpStream.ts +++ b/src/features/chat/hooks/useAcpStream.ts @@ -128,9 +128,6 @@ export function useAcpStream(enabled: boolean): void { } const buffer = getAndDeleteReplayBuffer(sid); if (buffer && buffer.length > 0) { - console.log( - `[perf:stream] ${sid.slice(0, 8)} flushing replay buffer (${buffer.length} messages) at ${performance.now().toFixed(1)}ms`, - ); useChatStore.getState().setMessages(sid, buffer); } } @@ -149,6 +146,9 @@ export function useAcpStream(enabled: boolean): void { unlisteners.push( listen("acp:message_created", (event) => { if (!active) return; + console.log( + `[perf:stream] ${event.payload.sessionId.slice(0, 8)} message_created mid=${event.payload.messageId.slice(0, 8)} at ${performance.now().toFixed(1)}ms`, + ); const store = useChatStore.getState(); const { sessionId, messageId, personaId, personaName } = event.payload; const providerId = getAssistantProviderId(sessionId); @@ -216,6 +216,9 @@ export function useAcpStream(enabled: boolean): void { unlisteners.push( listen("acp:text", (event) => { if (!active) return; + console.log( + `[perf:stream] ${event.payload.sessionId.slice(0, 8)} text mid=${event.payload.messageId.slice(0, 8)} len=${event.payload.text.length} at ${performance.now().toFixed(1)}ms`, + ); const store = useChatStore.getState(); const { sessionId, messageId, text } = event.payload; @@ -243,6 +246,9 @@ export function useAcpStream(enabled: boolean): void { unlisteners.push( listen("acp:done", (event) => { if (!active) return; + console.log( + `[perf:stream] ${event.payload.sessionId.slice(0, 8)} done mid=${event.payload.messageId.slice(0, 8)} at ${performance.now().toFixed(1)}ms`, + ); const store = useChatStore.getState(); const { sessionId, messageId } = event.payload; const isLoading = store.loadingSessionIds.has(sessionId); @@ -438,6 +444,9 @@ export function useAcpStream(enabled: boolean): void { "acp:replay_user_message", (event) => { if (!active) return; + console.log( + `[perf:stream] ${event.payload.sessionId.slice(0, 8)} replay_user_message at ${performance.now().toFixed(1)}ms`, + ); const { sessionId, messageId, text } = event.payload; ensureReplayBuffer(sessionId).push({ id: messageId, @@ -453,6 +462,7 @@ export function useAcpStream(enabled: boolean): void { unlisteners.push( listen("acp:session_bound", (event) => { if (!active) return; + useChatSessionStore .getState() .setSessionAcpId( diff --git a/src/features/chat/ui/ChatView.tsx b/src/features/chat/ui/ChatView.tsx index a1d2638..7551a53 100644 --- a/src/features/chat/ui/ChatView.tsx +++ b/src/features/chat/ui/ChatView.tsx @@ -197,7 +197,9 @@ export function ChatView({ void acpPrepareSession(activeSessionId, selectedProvider, { workingDir: activeWorkingContext.path, personaId: selectedPersonaId ?? undefined, - }).catch(() => undefined); + }).catch((error) => { + console.error("Failed to prepare ACP session:", error); + }); }, [ activeWorkingContext, activeSessionId,