Skip to content
This repository was archived by the owner on Apr 14, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions scripts/check-file-sizes.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ const EXCEPTIONS = {
"ACP-backed session overlay persistence, draft migration, and sidebar-facing session merge logic live together for now.",
},
"src-tauri/src/services/acp/manager/dispatcher.rs": {
limit: 520,
limit: 540,
justification:
"ACP replay and live-stream event fan-out share one dispatcher until session event routing is split.",
"ACP replay and live-stream event fan-out share one dispatcher with replay event counting for drain stabilisation.",
},
"src-tauri/src/services/acp/manager.rs": {
limit: 630,
justification:
"ACP manager command dispatch loop — export/import/fork session ext_method dispatch adds boilerplate.",
},
"src-tauri/src/services/acp/manager/session_ops.rs": {
limit: 570,
limit: 620,
justification:
"Session prepare/load/list logic, working-dir updates, and composite prepared-session reuse remain colocated while ACP session ownership stabilizes.",
"Session prepare/load/list logic, working-dir updates, wait_for_replay_drain helper with iteration cap, and composite prepared-session reuse remain colocated while ACP session ownership stabilizes.",
},
};

Expand Down
15 changes: 14 additions & 1 deletion src-tauri/src/services/acp/manager/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(super) struct SessionRoute {
pub(super) writer: Option<Arc<dyn MessageWriter>>,
pub(super) canceled: bool,
pub(super) replay_message_id: Option<String>,
pub(super) replay_events: u32,
}

pub(super) struct SessionEventDispatcher {
Expand Down Expand Up @@ -94,6 +95,7 @@ impl SessionEventDispatcher {
writer: None,
canceled: false,
replay_message_id: None,
replay_events: 0,
});

let _ = self.app_handle.emit(
Expand Down Expand Up @@ -121,6 +123,7 @@ impl SessionEventDispatcher {
writer: Some(writer),
canceled: false,
replay_message_id: None,
replay_events: 0,
},
);
}
Expand Down Expand Up @@ -254,6 +257,14 @@ impl SessionEventDispatcher {
},
);
}

pub(super) async fn get_replay_event_count(&self, goose_session_id: &str) -> u32 {
let routes = self.routes.lock().await;
routes
.get(goose_session_id)
.map(|r| r.replay_events)
.unwrap_or(0)
}
}

fn extract_content_preview(content: &[agent_client_protocol::ToolCallContent]) -> Option<String> {
Expand Down Expand Up @@ -381,7 +392,7 @@ impl Client for SessionEventDispatcher {
match &notification.update {
SessionUpdate::UserMessageChunk(chunk) => {
if let AcpContentBlock::Text(text) = &chunk.content {
// Finalize any in-progress assistant message first
// Finalize any in-progress assistant message and count replay event
{
let mut routes = self.routes.lock().await;
if let Some(route) = routes.get_mut(&goose_session_id) {
Expand All @@ -394,6 +405,7 @@ impl Client for SessionEventDispatcher {
},
);
}
route.replay_events += 1;
}
}

Expand Down Expand Up @@ -436,6 +448,7 @@ impl Client for SessionEventDispatcher {
let mut routes = self.routes.lock().await;
if let Some(route) = routes.get_mut(&goose_session_id) {
route.replay_message_id = Some(new_id.clone());
route.replay_events += 1;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Count every replay event before finalizing replay

wait_for_replay_drain waits for replay_events to stabilize, but this counter is only incremented when a new replay assistant message is created (plus user chunks), not for subsequent AgentMessageChunk/tool replay updates on the same message. That lets load_session_inner call finalize_replay and emit acp:replay_complete while replay chunks are still queued, and those late chunks can be dropped once the frontend marks the message completed (shouldTrackStreamingEvent in useAcpStream). Please increment the drain counter for every replay notification that can still mutate buffered replay state, or use a completion signal tied to the notification queue instead of message creation.

Useful? React with 👍 / 👎.

}
new_id
};
Expand Down
51 changes: 51 additions & 0 deletions src-tauri/src/services/acp/manager/session_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,16 @@ pub(super) async fn load_session_inner(
.await
.map_err(|error| format!("Failed to load Goose session: {error:?}"))?;

// The ACP RPC layer resolves responses synchronously but dispatches
// notifications asynchronously via spawned tasks. After load_session
// returns, replay notifications may still be queued. Yield repeatedly
// to let the single-threaded runtime drain them before counting.
wait_for_replay_drain(|| async { dispatcher.get_replay_event_count(goose_session_id).await })
.await;

// Finalize any in-progress replay assistant message
dispatcher.finalize_replay(goose_session_id).await;

dispatcher.emit_replay_complete(local_session_id);

if let Some(models) = &response.models {
Expand Down Expand Up @@ -558,3 +566,46 @@ pub(super) async fn set_model_inner(

Ok(())
}

/// Yield repeatedly until an async counter stabilises for 3 consecutive rounds.
///
/// After `load_session` returns, the ACP RPC layer may still have spawned
/// notification tasks that haven't run yet. This function yields to the
/// runtime between polls so those tasks get a chance to execute, and only
/// returns once the count has been stable for 3 consecutive yields — giving
/// us confidence that all replay events have been dispatched.
///
/// A safety cap of 100 iterations prevents infinite spinning if a bug causes
/// the counter to increment indefinitely.
const MAX_DRAIN_ITERATIONS: u32 = 100;

async fn wait_for_replay_drain<F, Fut>(mut get_count: F) -> u32
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = u32>,
{
let mut prev_total = 0u32;
let mut stable_rounds = 0u8;
let mut iterations = 0u32;
loop {
tokio::task::yield_now().await;
let total = get_count().await;
iterations += 1;
if total == prev_total {
stable_rounds += 1;
if stable_rounds >= 3 {
return total;
}
} else {
stable_rounds = 0;
prev_total = total;
}
if iterations >= MAX_DRAIN_ITERATIONS {
log::warn!(
"wait_for_replay_drain hit iteration cap ({MAX_DRAIN_ITERATIONS}); \
returning partial count {total}"
);
return total;
}
}
}
100 changes: 97 additions & 3 deletions src-tauri/src/services/acp/manager/session_ops/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::{HashMap, HashSet};

use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use super::{
needs_provider_update, prepared_session_for_key, register_prepared_session_keys, ManagerState,
PreparedSession,
needs_provider_update, prepared_session_for_key, register_prepared_session_keys,
wait_for_replay_drain, ManagerState, PreparedSession, MAX_DRAIN_ITERATIONS,
};
use crate::services::acp::split_composite_key;

Expand Down Expand Up @@ -81,3 +82,96 @@ fn register_prepared_session_keys_preserves_composite_and_local_entries() {
"goose-1"
);
}

#[tokio::test]
async fn replay_drain_returns_immediately_when_count_is_zero() {
let final_count = wait_for_replay_drain(|| async { 0u32 }).await;
assert_eq!(final_count, 0);
}

#[tokio::test]
async fn replay_drain_returns_stable_count() {
let counter = Arc::new(AtomicU32::new(42));
let c = counter.clone();
let final_count = wait_for_replay_drain(|| {
let c = c.clone();
async move { c.load(Ordering::SeqCst) }
})
.await;
assert_eq!(final_count, 42);
}

#[tokio::test]
async fn replay_drain_waits_for_spawned_notifications() {
let counter = Arc::new(AtomicU32::new(0));
let c = counter.clone();

// Simulate async notifications arriving over multiple yields, like
// the real ACP RPC layer does after load_session returns.
tokio::spawn(async move {
for i in 1..=5 {
tokio::task::yield_now().await;
c.store(i, Ordering::SeqCst);
}
});

let c2 = counter.clone();
let final_count = wait_for_replay_drain(|| {
let c = c2.clone();
async move { c.load(Ordering::SeqCst) }
})
.await;

assert_eq!(final_count, 5);
}

#[tokio::test]
async fn replay_drain_resets_stability_on_late_arrival() {
// Simulate: counter jumps to 3, stabilises for 2 rounds, then a late
// notification bumps it to 4. The drain must NOT stop at 3.
let poll_count = Arc::new(AtomicU32::new(0));

let pc = poll_count.clone();
let final_count = wait_for_replay_drain(|| {
let pc = pc.clone();
async move {
let poll = pc.fetch_add(1, Ordering::SeqCst);
// Polls 0..2 return 3 (2 stable rounds), then poll 3 bumps
// to 4 — simulating a late notification just before the drain
// would have declared stability. The drain must reset and
// wait for 4 to stabilise.
if poll < 3 {
3
} else {
4
}
}
})
.await;

// Must see the late arrival, not stop at 3
assert_eq!(final_count, 4);
// Verify the stability window truly reset: 3 polls to see 3, 1 poll
// to see the bump to 4, then 3 more polls for 4 to stabilise = 7 min.
assert!(
poll_count.load(Ordering::SeqCst) >= 7,
"expected at least 7 polls to confirm stability window reset, got {}",
poll_count.load(Ordering::SeqCst)
);
}

#[tokio::test]
async fn replay_drain_caps_iterations_on_runaway_counter() {
// Simulate a counter that never stabilises — increments every poll.
let poll_count = Arc::new(AtomicU32::new(0));
let pc = poll_count.clone();
let final_count = wait_for_replay_drain(|| {
let pc = pc.clone();
async move { pc.fetch_add(1, Ordering::SeqCst) + 1 }
})
.await;

// Should have stopped at the cap rather than spinning forever.
assert_eq!(final_count, MAX_DRAIN_ITERATIONS);
assert_eq!(poll_count.load(Ordering::SeqCst), MAX_DRAIN_ITERATIONS);
}
3 changes: 0 additions & 3 deletions src/app/AppShell.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ export function AppShell({ children }: { children?: React.ReactNode }) {
} catch (err) {
console.error("Failed to load session messages:", err);
useChatStore.getState().setSessionLoading(sessionId, false);
console.log(
`[perf:load] ${sessionId.slice(0, 8)} failed, loading=false at +${(performance.now() - t0).toFixed(1)}ms`,
);
}
}, []);

Expand Down
16 changes: 13 additions & 3 deletions src/features/chat/hooks/useAcpStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ export function useAcpStream(enabled: boolean): void {
}
const buffer = getAndDeleteReplayBuffer(sid);
if (buffer && buffer.length > 0) {
console.log(
`[perf:stream] ${sid.slice(0, 8)} flushing replay buffer (${buffer.length} messages) at ${performance.now().toFixed(1)}ms`,
);
useChatStore.getState().setMessages(sid, buffer);
}
}
Expand All @@ -149,6 +146,9 @@ export function useAcpStream(enabled: boolean): void {
unlisteners.push(
listen<AcpMessageCreatedPayload>("acp:message_created", (event) => {
if (!active) return;
console.log(
`[perf:stream] ${event.payload.sessionId.slice(0, 8)} message_created mid=${event.payload.messageId.slice(0, 8)} at ${performance.now().toFixed(1)}ms`,
);
const store = useChatStore.getState();
const { sessionId, messageId, personaId, personaName } = event.payload;
const providerId = getAssistantProviderId(sessionId);
Expand Down Expand Up @@ -216,6 +216,9 @@ export function useAcpStream(enabled: boolean): void {
unlisteners.push(
listen<AcpTextPayload>("acp:text", (event) => {
if (!active) return;
console.log(
`[perf:stream] ${event.payload.sessionId.slice(0, 8)} text mid=${event.payload.messageId.slice(0, 8)} len=${event.payload.text.length} at ${performance.now().toFixed(1)}ms`,
);
const store = useChatStore.getState();
const { sessionId, messageId, text } = event.payload;

Expand Down Expand Up @@ -243,6 +246,9 @@ export function useAcpStream(enabled: boolean): void {
unlisteners.push(
listen<AcpDonePayload>("acp:done", (event) => {
if (!active) return;
console.log(
`[perf:stream] ${event.payload.sessionId.slice(0, 8)} done mid=${event.payload.messageId.slice(0, 8)} at ${performance.now().toFixed(1)}ms`,
);
const store = useChatStore.getState();
const { sessionId, messageId } = event.payload;
const isLoading = store.loadingSessionIds.has(sessionId);
Expand Down Expand Up @@ -438,6 +444,9 @@ export function useAcpStream(enabled: boolean): void {
"acp:replay_user_message",
(event) => {
if (!active) return;
console.log(
`[perf:stream] ${event.payload.sessionId.slice(0, 8)} replay_user_message at ${performance.now().toFixed(1)}ms`,
);
const { sessionId, messageId, text } = event.payload;
ensureReplayBuffer(sessionId).push({
id: messageId,
Expand All @@ -453,6 +462,7 @@ export function useAcpStream(enabled: boolean): void {
unlisteners.push(
listen<AcpSessionBoundPayload>("acp:session_bound", (event) => {
if (!active) return;

useChatSessionStore
.getState()
.setSessionAcpId(
Expand Down
4 changes: 3 additions & 1 deletion src/features/chat/ui/ChatView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ export function ChatView({
void acpPrepareSession(activeSessionId, selectedProvider, {
workingDir: activeWorkingContext.path,
personaId: selectedPersonaId ?? undefined,
}).catch(() => undefined);
}).catch((error) => {
console.error("Failed to prepare ACP session:", error);
});
}, [
activeWorkingContext,
activeSessionId,
Expand Down
Loading