From 5fb8c4c8cdb2d7da3edac6fbd600a4bdabc61ee9 Mon Sep 17 00:00:00 2001 From: LB7666 Date: Tue, 30 Jun 2026 16:38:59 +0800 Subject: [PATCH 1/2] fix(codex): surface streaming fatal errors --- crates/llm-access/src/provider.rs | 1 + .../llm-access/src/provider/codex_dispatch.rs | 38 ++++----- .../src/provider/codex_stream_error.rs | 80 +++++++++++++++++++ .../src/provider/codex_upstream_error.rs | 8 ++ crates/llm-access/src/provider/tests.rs | 10 ++- 5 files changed, 110 insertions(+), 27 deletions(-) create mode 100644 crates/llm-access/src/provider/codex_stream_error.rs diff --git a/crates/llm-access/src/provider.rs b/crates/llm-access/src/provider.rs index c427a7c..d2d42b8 100644 --- a/crates/llm-access/src/provider.rs +++ b/crates/llm-access/src/provider.rs @@ -13,6 +13,7 @@ mod codex_session_affinity; mod codex_session_recovery; mod codex_session_rejection; mod codex_sse; +mod codex_stream_error; mod codex_upstream_error; mod entry; mod errors; diff --git a/crates/llm-access/src/provider/codex_dispatch.rs b/crates/llm-access/src/provider/codex_dispatch.rs index 8f9bbaf..617eee8 100644 --- a/crates/llm-access/src/provider/codex_dispatch.rs +++ b/crates/llm-access/src/provider/codex_dispatch.rs @@ -52,6 +52,7 @@ use super::{ completed_response_from_sse_bytes, missing_codex_usage, record_codex_preflight_failure, record_codex_usage, }, + codex_stream_error::codex_stream_failure_chunks, codex_upstream_error::{ classify_codex_sse_event_failure, classify_codex_success_error_body, classify_codex_upstream_failure, CodexClassifiedUpstreamError, CodexUpstreamErrorClass, @@ -1245,20 +1246,6 @@ fn codex_status_for_error_class( } } -/// OpenAI-style `error.code` to attach when surfacing a classified upstream -/// failure to the client. Codex (and OpenAI-compatible clients) classify some -/// errors by `code` rather than message text, so emitting the canonical code -/// lets the client handle the failure cleanly (e.g. an overload becomes a tidy -/// "high load" notice via `server_is_overloaded`) instead of showing the raw -/// upstream message verbatim. -fn codex_surface_code_for_error_class(class: CodexUpstreamErrorClass) -> Option<&'static str> { - match class { - CodexUpstreamErrorClass::ServerOverloaded => Some("server_is_overloaded"), - CodexUpstreamErrorClass::CyberPolicy => Some("cyber_policy"), - _ => None, - } -} - /// Record the upstream error class on the usage event, and flag it as a /// permanently session-blocking failure when its disposition is the fatal /// `cyber_policy` strict-session block, so the usage log can surface both @@ -1749,7 +1736,7 @@ async fn adapt_codex_upstream_response_from_parts( &prepared.original_path, effective_status, &error.message, - codex_surface_code_for_error_class(error.class), + error.class.surface_error_code(), )); } if !status.is_success() @@ -2094,6 +2081,7 @@ async fn stream_codex_upstream_response( &error.message, &error.body, ); + capture_codex_error_classification(&mut guard.usage_meta, &error); maybe_remember_codex_session_rejection( codex_session_rejection.as_ref(), guard.route.codex_strict_session_rejection_enabled, @@ -2109,12 +2097,16 @@ async fn stream_codex_upstream_response( error_class = %error.class.as_str(), "codex stream upstream failure detected after downstream write started" ); - // Intentionally stop the stream here without emitting `[DONE]` - // and without forwarding the raw upstream failure event: the - // partial content already reached the client and the failure is - // recorded server-side via the guard above. A well-behaved - // client detects the missing terminal sentinel. (See - // codex_dispatch_streaming_mid_failure_stops_without_done_*.) + let failure_chunks = codex_stream_failure_chunks( + response_adapter, + &guard.prepared.original_path, + effective_status, + &error, + ); + for bytes in failure_chunks { + guard.observe_chunk(&bytes, Some("error")); + yield Ok::(bytes); + } return; } guard.usage_collector.observe_event(&event); @@ -2194,7 +2186,7 @@ async fn record_codex_stream_preflight_failure( &prepared.original_path, effective_status, &error.message, - codex_surface_code_for_error_class(error.class), + error.class.surface_error_code(), ), ); } else { @@ -2235,6 +2227,6 @@ async fn record_codex_stream_preflight_failure( &prepared.original_path, effective_status, &error.message, - codex_surface_code_for_error_class(error.class), + error.class.surface_error_code(), ) } diff --git a/crates/llm-access/src/provider/codex_stream_error.rs b/crates/llm-access/src/provider/codex_stream_error.rs new file mode 100644 index 0000000..10b8e9c --- /dev/null +++ b/crates/llm-access/src/provider/codex_stream_error.rs @@ -0,0 +1,80 @@ +use axum::{body::Bytes, http::StatusCode}; +use llm_access_codex::types::GatewayResponseAdapter; +use serde_json::{json, Value}; + +use super::{ + codex_upstream_error::CodexClassifiedUpstreamError, + errors::{codex_error_type_for_status, codex_surface_error_body_with_code}, +}; + +pub(super) fn codex_stream_failure_chunks( + response_adapter: GatewayResponseAdapter, + original_path: &str, + effective_status: StatusCode, + error: &CodexClassifiedUpstreamError, +) -> Vec { + match response_adapter { + GatewayResponseAdapter::Responses => vec![encode_sse_event( + "response.failed", + &responses_failed_payload(effective_status, error), + )], + GatewayResponseAdapter::ChatCompletions => vec![ + encode_sse_data(&codex_surface_error_body_with_code( + original_path, + effective_status, + &error.message, + error.class.surface_error_code(), + )), + Bytes::from_static(b"data: [DONE]\n\n"), + ], + GatewayResponseAdapter::AnthropicMessages => { + vec![encode_sse_event("error", &anthropic_error_payload(effective_status, error))] + }, + } +} + +fn responses_failed_payload( + effective_status: StatusCode, + error: &CodexClassifiedUpstreamError, +) -> Value { + json!({ + "type": "response.failed", + "response": { + "status": "failed", + "error": stream_error_object(effective_status, error), + } + }) +} + +fn anthropic_error_payload( + effective_status: StatusCode, + error: &CodexClassifiedUpstreamError, +) -> Value { + json!({ + "type": "error", + "error": stream_error_object(effective_status, error), + }) +} + +fn stream_error_object( + effective_status: StatusCode, + error: &CodexClassifiedUpstreamError, +) -> Value { + json!({ + "type": codex_error_type_for_status(effective_status), + "message": error.message, + "code": error.class.surface_error_code().map_or(Value::Null, Value::from), + }) +} + +fn encode_sse_event(event: &str, payload: &Value) -> Bytes { + Bytes::from(format!("event: {event}\ndata: {}\n\n", encode_json(payload))) +} + +fn encode_sse_data(data: &str) -> Bytes { + Bytes::from(format!("data: {data}\n\n")) +} + +fn encode_json(value: &Value) -> String { + serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()) +} diff --git a/crates/llm-access/src/provider/codex_upstream_error.rs b/crates/llm-access/src/provider/codex_upstream_error.rs index 3fce480..948ba26 100644 --- a/crates/llm-access/src/provider/codex_upstream_error.rs +++ b/crates/llm-access/src/provider/codex_upstream_error.rs @@ -39,6 +39,14 @@ impl CodexUpstreamErrorClass { Self::UnexpectedStatus => "unexpected_status", } } + + pub(crate) fn surface_error_code(self) -> Option<&'static str> { + match self { + Self::ServerOverloaded => Some("server_is_overloaded"), + Self::CyberPolicy => Some("cyber_policy"), + _ => None, + } + } } #[derive(Debug, Clone)] diff --git a/crates/llm-access/src/provider/tests.rs b/crates/llm-access/src/provider/tests.rs index aefce88..ee942db 100644 --- a/crates/llm-access/src/provider/tests.rs +++ b/crates/llm-access/src/provider/tests.rs @@ -6546,7 +6546,8 @@ async fn codex_dispatch_stops_preflight_buffering_after_lifecycle_event_cap() { .await .expect("response body"); let body = String::from_utf8(body.to_vec()).expect("utf8 response"); - assert!(!body.contains("server_is_overloaded"), "body: {body}"); + assert!(body.contains("server_is_overloaded"), "body: {body}"); + assert!(body.contains("[DONE]"), "body: {body}"); let requests = captured.requests.lock().expect("captured requests"); let auths = requests @@ -6559,6 +6560,7 @@ async fn codex_dispatch_stops_preflight_buffering_after_lifecycle_event_cap() { wait_for_usage_event_count(store.as_ref(), 1).await; let events = store.usage_events.lock().expect("usage events"); assert_eq!(events.len(), 1); + assert_eq!(events[0].status_code, i64::from(StatusCode::SERVICE_UNAVAILABLE.as_u16())); assert_ne!(events[0].quota_failover_count, 1); } @@ -7542,7 +7544,7 @@ async fn codex_dispatch_streaming_first_failure_returns_error_without_sse_body() } #[tokio::test] -async fn codex_dispatch_streaming_mid_failure_stops_without_done_and_records_failure() { +async fn codex_dispatch_streaming_mid_failure_emits_error_done_and_records_failure() { let _guard = crate::CODEX_UPSTREAM_ENV_LOCK .lock() .expect("codex upstream env lock"); @@ -7589,8 +7591,8 @@ async fn codex_dispatch_streaming_mid_failure_stops_without_done_and_records_fai .expect("response body"); let body = String::from_utf8(body.to_vec()).expect("utf8 response"); assert!(body.contains("hello "), "body: {body}"); - assert!(!body.contains("response.failed"), "body: {body}"); - assert!(!body.contains("[DONE]"), "body: {body}"); + assert!(body.contains("tool_choice references a missing tool"), "body: {body}"); + assert!(body.contains("[DONE]"), "body: {body}"); wait_for_usage_event_count(store.as_ref(), 1).await; let events = store.usage_events.lock().expect("usage events"); From dc7249e59c403ee2898a12253bffa1bf8f143a21 Mon Sep 17 00:00:00 2001 From: LB7666 Date: Tue, 30 Jun 2026 17:15:57 +0800 Subject: [PATCH 2/2] docs(codex): explain stream error rendering --- .../src/provider/codex_stream_error.rs | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/crates/llm-access/src/provider/codex_stream_error.rs b/crates/llm-access/src/provider/codex_stream_error.rs index 10b8e9c..e8fdae4 100644 --- a/crates/llm-access/src/provider/codex_stream_error.rs +++ b/crates/llm-access/src/provider/codex_stream_error.rs @@ -1,3 +1,45 @@ +//! Terminal SSE error rendering for Codex streams that fail after downstream +//! bytes have already been sent. +//! +//! This module deliberately does not classify upstream failures. The dispatcher +//! already owns that job; this file only translates a classified failure into +//! the terminal shape expected by the client-facing protocol. +//! +//! ```text +//! upstream SSE event +//! | +//! v +//! classify_codex_sse_event_failure(...) +//! | +//! v +//! CodexClassifiedUpstreamError + effective HTTP status +//! | +//! v +//! codex_stream_failure_chunks(adapter, path, status, error) +//! | +//! +-- Responses ---------> event: response.failed +//! | data: {"type":"response.failed", ...} +//! | +//! +-- ChatCompletions ---> data: {"error":{...}} +//! | data: [DONE] +//! | +//! `-- AnthropicMessages -> event: error +//! data: {"type":"error","error":{...}} +//! ``` +//! +//! The important boundary is "after downstream write started": at that point we +//! cannot change the HTTP status and we cannot safely fail over to another +//! account without mixing two upstream conversations in one client stream. +//! Returning a protocol-shaped terminal error is the least surprising contract: +//! clients see the real failure, usage still records the upstream failure, and +//! the stream does not look like a normal empty response. +//! +//! Error-body policy: +//! - Surface only the sanitized classified message/type/code to the client. +//! - Keep raw upstream error bodies in usage metadata, not in SSE frames. +//! - Leave preflight failures to the existing non-stream JSON error path, where +//! the HTTP response has not been committed yet. + use axum::{body::Bytes, http::StatusCode}; use llm_access_codex::types::GatewayResponseAdapter; use serde_json::{json, Value}; @@ -7,6 +49,13 @@ use super::{ errors::{codex_error_type_for_status, codex_surface_error_body_with_code}, }; +/// Build the final client-visible chunks for a stream that can no longer +/// return a regular HTTP error response. +/// +/// The output is intentionally tiny: one SSE event for Responses/Anthropic, or +/// an OpenAI-style error data frame plus `[DONE]` for Chat Completions. Keeping +/// this as a `Vec` makes the dispatcher branch explicit without adding +/// another stream abstraction for at most two frames. pub(super) fn codex_stream_failure_chunks( response_adapter: GatewayResponseAdapter, original_path: &str,