From c302c248bfdb4e7298189987f61505c42ebaa28f Mon Sep 17 00:00:00 2001 From: Lewis Tunstall Date: Mon, 8 Jun 2026 12:25:13 +0200 Subject: [PATCH 1/3] Recover chat sends after fetch failures Co-authored-by: OpenAI Codex --- frontend/src/hooks/useAgentChat.ts | 75 +++++++++++++ frontend/src/lib/sse-chat-transport.ts | 146 +++++++++++++++++++++---- 2 files changed, 198 insertions(+), 23 deletions(-) diff --git a/frontend/src/hooks/useAgentChat.ts b/frontend/src/hooks/useAgentChat.ts index ef8cd022..6b213780 100644 --- a/frontend/src/hooks/useAgentChat.ts +++ b/frontend/src/hooks/useAgentChat.ts @@ -32,6 +32,13 @@ interface UseAgentChatOptions { onSessionDead?: (sessionId: string) => void; } +function textFromUIMessage(message: UIMessage): string { + return message.parts + .filter((p): p is Extract => p.type === 'text') + .map(p => p.text) + .join(''); +} + export function useAgentChat({ sessionId, isActive, isProcessing = false, onReady, onError, onSessionDead }: UseAgentChatOptions) { const callbacksRef = useRef({ onReady, onError, onSessionDead }); callbacksRef.current = { onReady, onError, onSessionDead }; @@ -348,6 +355,74 @@ export function useAgentChat({ sessionId, isActive, isProcessing = false, onRead useUsageStore.getState().applyUsageEvent(sessionId, eventType, data); }, onInterrupted: () => { /* no-op — handled by stop() caller */ }, + onRecoverMessages: async ({ submittedText, currentMessageCount, currentUserMessageCount }) => { + try { + const [msgsRes, infoRes] = await Promise.all([ + apiFetch(`/api/session/${sessionId}/messages`), + apiFetch(`/api/session/${sessionId}`), + ]); + + if (infoRes.status === 404 && msgsRes.status === 404) { + callbacksRef.current.onSessionDead?.(sessionId); + return false; + } + if (!msgsRes.ok) return false; + + const data = await msgsRes.json(); + if (!Array.isArray(data) || data.length === 0) return false; + saveBackendMessages(sessionId, data); + + let pendingIds: Set | undefined; + let backendIsProcessing = false; + if (infoRes.ok) { + const info = await infoRes.json(); + backendIsProcessing = !!info.is_processing; + if (info.pending_approval && Array.isArray(info.pending_approval)) { + pendingIds = new Set( + info.pending_approval.map((t: { tool_call_id: string }) => t.tool_call_id) + ); + if (pendingIds.size > 0) setNeedsAttention(sessionId, true); + } + if (info.auto_approval) { + updateSessionYolo(sessionId, info.auto_approval); + } + } + + const uiMsgs = llmMessagesToUIMessages( + data, + pendingIds, + chatActionsRef.current.messages, + ); + const setMsgs = chatActionsRef.current.setMessages; + if (setMsgs && uiMsgs.length > 0) { + setMsgs(uiMsgs); + saveMessages(sessionId, uiMsgs); + } + + if (backendIsProcessing) { + setProcessingState(true, { activityStatus: { type: 'thinking' } }); + return false; + } + if (pendingIds && pendingIds.size > 0) { + setProcessingState(false, { activityStatus: { type: 'waiting-approval' } }); + } else { + setProcessingState(false); + } + + if (uiMsgs.length > currentMessageCount) return true; + if (!submittedText) return false; + + const userMessages = uiMsgs.filter((m) => m.role === 'user'); + const lastUser = userMessages[userMessages.length - 1]; + return ( + userMessages.length >= currentUserMessageCount && + !!lastUser && + textFromUIMessage(lastUser).trim() === submittedText.trim() + ); + } catch { + return false; + } + }, }), // eslint-disable-next-line react-hooks/exhaustive-deps [sessionId, setProcessingState], diff --git a/frontend/src/lib/sse-chat-transport.ts b/frontend/src/lib/sse-chat-transport.ts index 7c45d987..1ee35c3d 100644 --- a/frontend/src/lib/sse-chat-transport.ts +++ b/frontend/src/lib/sse-chat-transport.ts @@ -41,6 +41,13 @@ export interface SideChannelCallbacks { onToolRunning: (toolName: string, description?: string) => void; onUsageEvent: (eventType: 'llm_call' | 'hf_job_complete', data: Record) => void; onInterrupted: () => void; + onRecoverMessages: (context: MessageRecoveryContext) => Promise; +} + +export interface MessageRecoveryContext { + submittedText?: string; + currentMessageCount: number; + currentUserMessageCount: number; } // --------------------------------------------------------------------------- @@ -70,6 +77,28 @@ async function readErrorResponse(response: Response): Promise { } } +function isAbortError(error: unknown, signal?: AbortSignal): boolean { + if (signal?.aborted) return true; + if (!(error instanceof Error)) return false; + return error.name === 'AbortError'; +} + +function isRecoverableFetchError(error: unknown): boolean { + if (!(error instanceof Error)) return false; + if (error.name === 'TypeError') return true; + + const message = error.message.toLowerCase(); + return [ + 'load failed', + 'failed to fetch', + 'networkerror', + 'network error', + 'network request failed', + 'connection', + 'fetch', + ].some((pattern) => message.includes(pattern)); +} + /** Parse an SSE text stream into AgentEvent objects. */ function createSSEParserStream(sessionId: string): TransformStream { let buffer = ''; @@ -130,6 +159,15 @@ function createSSEParserStream(sessionId: string): TransformStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ type: 'finish', finishReason: 'stop' }); + controller.close(); + }, + }); +} + /** Transform AgentEvent objects into UIMessageChunk objects for the Vercel AI SDK. */ function createEventToChunkStream(sideChannel: SideChannelCallbacks): TransformStream { let textPartId: string | null = null; @@ -369,6 +407,66 @@ export class SSEChatTransport implements ChatTransport { // Nothing to clean up — no persistent connections } + private async connectToEventStream(): Promise | null> { + const lastSeq = localStorage.getItem(lastEventKey(this.sessionId)); + const qs = lastSeq ? `?after=${encodeURIComponent(lastSeq)}` : ''; + const response = await apiFetch(`/api/events/${this.sessionId}${qs}`, { + headers: { 'Accept': 'text/event-stream' }, + }); + if (!response.ok || !response.body) return null; + + this.sideChannel.onProcessing(); + + return response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(createSSEParserStream(this.sessionId)) + .pipeThrough(createEventToChunkStream(this.sideChannel)); + } + + private async recoverFailedSend( + context: MessageRecoveryContext, + ): Promise> { + let infoRes: Response; + try { + infoRes = await apiFetch(`/api/session/${this.sessionId}`); + } catch { + throw new Error( + 'Connection to the Space was interrupted before the message was accepted. Please retry.', + ); + } + + if (infoRes.status === 404) { + this.sideChannel.onSessionDead(this.sessionId); + throw new Error('Session not found or inactive'); + } + if (!infoRes.ok) { + throw new Error( + 'Connection to the Space was interrupted before the message was accepted. Please retry.', + ); + } + + const info = await infoRes.json(); + if (info.is_processing) { + try { + const stream = await this.connectToEventStream(); + if (stream) return stream; + } catch { + // Fall through to message hydration; the turn may have completed + // between the status probe and the event-stream reconnect. + } + } + + const recovered = await this.sideChannel.onRecoverMessages(context); + if (recovered) { + this.sideChannel.onProcessingDone(); + return createRecoveredFinishedStream(); + } + + throw new Error( + 'Connection to the Space was interrupted before the message was accepted. Please retry.', + ); + } + // -- ChatTransport interface --------------------------------------------- async sendMessages( @@ -391,6 +489,7 @@ export class SSEChatTransport implements ChatTransport { ) || []; let body: Record; + let submittedText: string | undefined; if (approvedParts.length > 0) { // Approval continuation — extract approval decisions const approvals = approvedParts.map((p) => { @@ -415,19 +514,33 @@ export class SSEChatTransport implements ChatTransport { .map(p => p.text) .join('') : ''; + submittedText = text; body = { text }; } // POST to SSE endpoint - const response = await apiFetch(`/api/chat/${sessionId}`, { - method: 'POST', - body: JSON.stringify(body), - signal: options.abortSignal, - headers: { - 'Content-Type': 'application/json', - 'Accept': 'text/event-stream', - }, - }); + let response: Response; + try { + response = await apiFetch(`/api/chat/${sessionId}`, { + method: 'POST', + body: JSON.stringify(body), + signal: options.abortSignal, + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + }, + }); + } catch (error) { + if (isAbortError(error, options.abortSignal) || !isRecoverableFetchError(error)) { + throw error; + } + logger.warn('Chat POST failed; attempting session recovery:', error); + return this.recoverFailedSend({ + submittedText, + currentMessageCount: options.messages.length, + currentUserMessageCount: options.messages.filter(m => m.role === 'user').length, + }); + } if (response.status === 404) { // Backend lost this session (e.g. Space restart). Signal the UI so @@ -460,20 +573,7 @@ export class SSEChatTransport implements ChatTransport { const info = await infoRes.json(); if (!info.is_processing) return null; - // Session is mid-turn — subscribe to its event broadcast. - const lastSeq = localStorage.getItem(lastEventKey(this.sessionId)); - const qs = lastSeq ? `?after=${encodeURIComponent(lastSeq)}` : ''; - const response = await apiFetch(`/api/events/${this.sessionId}${qs}`, { - headers: { 'Accept': 'text/event-stream' }, - }); - if (!response.ok || !response.body) return null; - - this.sideChannel.onProcessing(); - - return response.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(createSSEParserStream(this.sessionId)) - .pipeThrough(createEventToChunkStream(this.sideChannel)); + return this.connectToEventStream(); } catch { return null; } From 6510d69faa0bb165d6011ce625b0b38ea0df83dc Mon Sep 17 00:00:00 2001 From: Lewis Tunstall Date: Mon, 8 Jun 2026 13:47:29 +0200 Subject: [PATCH 2/3] Address chat send recovery review Co-authored-by: OpenAI Codex --- frontend/src/hooks/useAgentChat.ts | 25 ++++++++++++++----------- frontend/src/lib/sse-chat-transport.ts | 16 +++++++++++----- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/frontend/src/hooks/useAgentChat.ts b/frontend/src/hooks/useAgentChat.ts index 6b213780..3a75ec7f 100644 --- a/frontend/src/hooks/useAgentChat.ts +++ b/frontend/src/hooks/useAgentChat.ts @@ -393,8 +393,20 @@ export function useAgentChat({ sessionId, isActive, isProcessing = false, onRead pendingIds, chatActionsRef.current.messages, ); + const backendAdvanced = uiMsgs.length > currentMessageCount; + let submittedTurnAccepted = false; + if (submittedText) { + const userMessages = uiMsgs.filter((m) => m.role === 'user'); + const lastUser = userMessages[userMessages.length - 1]; + submittedTurnAccepted = ( + userMessages.length >= currentUserMessageCount && + !!lastUser && + textFromUIMessage(lastUser).trim() === submittedText.trim() + ); + } + const setMsgs = chatActionsRef.current.setMessages; - if (setMsgs && uiMsgs.length > 0) { + if (setMsgs && uiMsgs.length >= currentMessageCount) { setMsgs(uiMsgs); saveMessages(sessionId, uiMsgs); } @@ -409,16 +421,7 @@ export function useAgentChat({ sessionId, isActive, isProcessing = false, onRead setProcessingState(false); } - if (uiMsgs.length > currentMessageCount) return true; - if (!submittedText) return false; - - const userMessages = uiMsgs.filter((m) => m.role === 'user'); - const lastUser = userMessages[userMessages.length - 1]; - return ( - userMessages.length >= currentUserMessageCount && - !!lastUser && - textFromUIMessage(lastUser).trim() === submittedText.trim() - ); + return backendAdvanced || submittedTurnAccepted; } catch { return false; } diff --git a/frontend/src/lib/sse-chat-transport.ts b/frontend/src/lib/sse-chat-transport.ts index 1ee35c3d..2c1cad83 100644 --- a/frontend/src/lib/sse-chat-transport.ts +++ b/frontend/src/lib/sse-chat-transport.ts @@ -85,18 +85,24 @@ function isAbortError(error: unknown, signal?: AbortSignal): boolean { function isRecoverableFetchError(error: unknown): boolean { if (!(error instanceof Error)) return false; - if (error.name === 'TypeError') return true; + const name = error.name.toLowerCase(); const message = error.message.toLowerCase(); - return [ + const networkFailureMessages = [ 'load failed', 'failed to fetch', + 'fetch failed', 'networkerror', 'network error', 'network request failed', - 'connection', - 'fetch', - ].some((pattern) => message.includes(pattern)); + 'network connection was lost', + 'internet connection appears to be offline', + ]; + + return ( + name === 'networkerror' || + (name === 'typeerror' && networkFailureMessages.some((pattern) => message.includes(pattern))) + ); } /** Parse an SSE text stream into AgentEvent objects. */ From 05d4423ec45f8af61c480860b1c999daee78f642 Mon Sep 17 00:00:00 2001 From: Lewis Tunstall Date: Mon, 8 Jun 2026 15:39:49 +0200 Subject: [PATCH 3/3] Avoid redundant recovery session fetch Co-authored-by: OpenAI Codex --- frontend/src/hooks/useAgentChat.ts | 36 ++++++++++++++++++++------ frontend/src/lib/sse-chat-transport.ts | 19 ++++++++++++-- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/frontend/src/hooks/useAgentChat.ts b/frontend/src/hooks/useAgentChat.ts index 3a75ec7f..6b3aada8 100644 --- a/frontend/src/hooks/useAgentChat.ts +++ b/frontend/src/hooks/useAgentChat.ts @@ -355,14 +355,35 @@ export function useAgentChat({ sessionId, isActive, isProcessing = false, onRead useUsageStore.getState().applyUsageEvent(sessionId, eventType, data); }, onInterrupted: () => { /* no-op — handled by stop() caller */ }, - onRecoverMessages: async ({ submittedText, currentMessageCount, currentUserMessageCount }) => { + onRecoverMessages: async ({ + submittedText, + currentMessageCount, + currentUserMessageCount, + sessionInfo, + }) => { try { - const [msgsRes, infoRes] = await Promise.all([ - apiFetch(`/api/session/${sessionId}/messages`), - apiFetch(`/api/session/${sessionId}`), - ]); + let msgsRes: Response; + let info = sessionInfo; - if (infoRes.status === 404 && msgsRes.status === 404) { + if (sessionInfo) { + msgsRes = await apiFetch(`/api/session/${sessionId}/messages`); + } else { + const [fetchedMsgsRes, infoRes] = await Promise.all([ + apiFetch(`/api/session/${sessionId}/messages`), + apiFetch(`/api/session/${sessionId}`), + ]); + msgsRes = fetchedMsgsRes; + + if (infoRes.status === 404 && msgsRes.status === 404) { + callbacksRef.current.onSessionDead?.(sessionId); + return false; + } + if (infoRes.ok) { + info = await infoRes.json(); + } + } + + if (sessionInfo && msgsRes.status === 404) { callbacksRef.current.onSessionDead?.(sessionId); return false; } @@ -374,8 +395,7 @@ export function useAgentChat({ sessionId, isActive, isProcessing = false, onRead let pendingIds: Set | undefined; let backendIsProcessing = false; - if (infoRes.ok) { - const info = await infoRes.json(); + if (info) { backendIsProcessing = !!info.is_processing; if (info.pending_approval && Array.isArray(info.pending_approval)) { pendingIds = new Set( diff --git a/frontend/src/lib/sse-chat-transport.ts b/frontend/src/lib/sse-chat-transport.ts index 2c1cad83..35caa8ea 100644 --- a/frontend/src/lib/sse-chat-transport.ts +++ b/frontend/src/lib/sse-chat-transport.ts @@ -48,6 +48,18 @@ export interface MessageRecoveryContext { submittedText?: string; currentMessageCount: number; currentUserMessageCount: number; + sessionInfo?: RecoverySessionInfo; +} + +export interface RecoverySessionInfo { + is_processing?: boolean; + pending_approval?: Array<{ tool_call_id: string }> | null; + auto_approval?: { + enabled: boolean; + cost_cap_usd?: number | null; + estimated_spend_usd?: number; + remaining_usd?: number | null; + } | null; } // --------------------------------------------------------------------------- @@ -451,7 +463,7 @@ export class SSEChatTransport implements ChatTransport { ); } - const info = await infoRes.json(); + const info = await infoRes.json() as RecoverySessionInfo; if (info.is_processing) { try { const stream = await this.connectToEventStream(); @@ -462,7 +474,10 @@ export class SSEChatTransport implements ChatTransport { } } - const recovered = await this.sideChannel.onRecoverMessages(context); + const recovered = await this.sideChannel.onRecoverMessages({ + ...context, + sessionInfo: info.is_processing ? undefined : info, + }); if (recovered) { this.sideChannel.onProcessingDone(); return createRecoveredFinishedStream();