diff --git a/crates/agentic-core/src/proxy.rs b/crates/agentic-core/src/proxy.rs index 79329b9..36c5260 100644 --- a/crates/agentic-core/src/proxy.rs +++ b/crates/agentic-core/src/proxy.rs @@ -153,6 +153,48 @@ pub fn error_response(status: StatusCode, code: &str, message: &str) -> ProxyRes } } +/// Proxy a GET request to an arbitrary upstream path. +/// +/// Applies the same header filtering and auth injection as [`proxy_request`]. +/// Uses the non-streaming client; the response body is returned as a full +/// [`ProxyBody::Full`] payload. +pub async fn proxy_get(path: &str, request_headers: &HeaderMap, state: &ProxyState) -> ProxyResponse { + let llm_headers = filter_request_headers(request_headers, &state.config); + let base = state.config.llm_api_base.trim_end_matches('/'); + let url = format!("{base}/{}", path.trim_start_matches('/')); + + let llm_resp = match state.non_stream_client.get(&url).headers(llm_headers).send().await { + Ok(r) => r, + Err(e) if e.is_timeout() => { + warn!("upstream GET {path} timed out: {e}"); + return error_response(StatusCode::GATEWAY_TIMEOUT, "upstream_timeout", "upstream timeout"); + } + Err(e) => { + warn!("upstream GET {path} failed: {e}"); + return error_response(StatusCode::BAD_GATEWAY, "upstream_unavailable", "upstream unavailable"); + } + }; + + let status = StatusCode::from_u16(llm_resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); + let response_headers = filter_response_headers(llm_resp.headers()); + + match llm_resp.bytes().await { + Ok(payload) => ProxyResponse { + status, + headers: response_headers, + body: ProxyBody::Full(payload), + }, + Err(e) => { + warn!("failed to read upstream GET {path} body: {e}"); + error_response( + StatusCode::BAD_GATEWAY, + "upstream_unavailable", + "failed to read upstream response", + ) + } + } +} + pub async fn proxy_request(request: ProxyRequest, state: &ProxyState) -> ProxyResponse { let is_streaming = serde_json::from_slice::(&request.body) .ok() diff --git a/crates/agentic-server/src/app.rs b/crates/agentic-server/src/app.rs index 6e4583a..44b43c1 100644 --- a/crates/agentic-server/src/app.rs +++ b/crates/agentic-server/src/app.rs @@ -9,7 +9,7 @@ use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use agentic_core::executor::ExecutionContext; use agentic_core::proxy::ProxyState; -use crate::handler::{conversations, health, ready, responses, responses_ws}; +use crate::handler::{conversations, health, models, ready, responses, responses_ws}; /// Server-level configuration read from environment variables. pub struct ServerConfig { @@ -71,6 +71,7 @@ pub fn build_router(state: AppState, server_config: &ServerConfig) -> Router { .route("/health", get(health)) .route("/ready", get(ready)) .route("/v1/conversations", post(conversations)) + .route("/v1/models", get(models)) .route("/v1/responses", post(responses).get(responses_ws)) .layer(server_config.cors_layer()) .with_state(state) diff --git a/crates/agentic-server/src/handler.rs b/crates/agentic-server/src/handler.rs deleted file mode 100644 index 142d1de..0000000 --- a/crates/agentic-server/src/handler.rs +++ /dev/null @@ -1,499 +0,0 @@ -use std::sync::Arc; - -use axum::body::Body; -use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; -use axum::extract::{Request, State}; -use axum::http::HeaderMap; -use axum::http::request::Parts; -use axum::response::{IntoResponse, Response}; -use bytes::Bytes; -use either::Either; -use futures::stream::{SplitSink, SplitStream}; -use futures::{SinkExt, StreamExt}; -use http::StatusCode; -use serde_json::{Value, json}; -use thiserror::Error; -use tokio_util::sync::CancellationToken; -use tracing::warn; - -use agentic_core::executor::accumulator::ResponseAccumulator; -use agentic_core::executor::{ - BoxStream, ExecutionContext, ExecutorError, RequestContext, call_inference, create_conversation, execute, - persist_response, rehydrate_conversation, -}; -use agentic_core::proxy::{ProxyBody, ProxyRequest, ProxyResponse, error_response, proxy_request}; -use agentic_core::types::ResponsePayload; -use agentic_core::types::request_response::RequestPayload; -use agentic_core::utils::common::serialize_to_string; - -use crate::app::AppState; - -const MAX_BODY_SIZE: usize = 10 * 1024 * 1024; - -type WsSender = SplitSink; -type WsReceiver = SplitStream; - -#[derive(Debug, Error)] -enum WsError { - #[error(transparent)] - Executor(#[from] ExecutorError), - - #[error("invalid JSON: {0}")] - InvalidJson(#[source] serde_json::Error), - - #[error("failed to serialize websocket event: {0}")] - SerializeJson(#[source] serde_json::Error), - - #[error("websocket message type must be response.create")] - UnexpectedType, - - #[error("websocket messages must be JSON text frames")] - BinaryFrame, - - #[error("websocket received a new message while response stream is active")] - ConcurrentMessage, - - #[error("websocket send failed")] - SendFailed, - - #[error("websocket client disconnected")] - ClientDisconnected, - - #[error("websocket shutdown requested")] - Shutdown, - - #[error("websocket receive failed: {0}")] - Receive(String), -} - -impl WsError { - fn status(&self) -> StatusCode { - match self { - Self::Executor(err) => err.http_status(), - Self::InvalidJson(_) | Self::UnexpectedType | Self::BinaryFrame | Self::ConcurrentMessage => { - StatusCode::BAD_REQUEST - } - Self::SerializeJson(_) - | Self::SendFailed - | Self::ClientDisconnected - | Self::Shutdown - | Self::Receive(_) => StatusCode::INTERNAL_SERVER_ERROR, - } - } - - fn code(&self) -> &'static str { - match self { - Self::Executor(err) => err.error_code(), - Self::InvalidJson(_) => "invalid_json", - Self::UnexpectedType | Self::BinaryFrame | Self::ConcurrentMessage => "invalid_request_error", - Self::SerializeJson(_) - | Self::SendFailed - | Self::ClientDisconnected - | Self::Shutdown - | Self::Receive(_) => "server_error", - } - } - - fn to_ws_frame(&self) -> Option { - if matches!( - self, - Self::SerializeJson(_) | Self::SendFailed | Self::ClientDisconnected | Self::Shutdown | Self::Receive(_) - ) { - return None; - } - - let code = self.code(); - Some(json!({ - "type": "error", - "status": self.status().as_u16(), - "error": { - "message": self.to_string(), - "type": code, - "code": code - } - })) - } -} - -pub async fn health() -> impl IntoResponse { - StatusCode::OK -} - -pub async fn ready(State(state): State) -> impl IntoResponse { - let base = state.llm_api_base.trim_end_matches('/'); - let url = format!("{base}/health"); - - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(2)) - .build(); - - let Ok(client) = client else { - return StatusCode::SERVICE_UNAVAILABLE; - }; - - match client.get(&url).send().await { - Ok(resp) if resp.status().is_success() => StatusCode::OK, - Ok(resp) => { - warn!("LLM backend not ready: {}", resp.status()); - StatusCode::SERVICE_UNAVAILABLE - } - Err(e) => { - warn!("LLM backend unreachable: {e}"); - StatusCode::SERVICE_UNAVAILABLE - } - } -} - -async fn read_bytes(body: Body) -> Result { - axum::body::to_bytes(body, MAX_BODY_SIZE).await.map_err(|_| { - convert_response(error_response( - StatusCode::PAYLOAD_TOO_LARGE, - "body_too_large", - "request body too large", - )) - }) -} - -async fn read_and_parse(body: Body) -> Result<(Bytes, RequestPayload), Response> { - let bytes = read_bytes(body).await?; - let payload = serde_json::from_slice::(&bytes) - .map_err(|e| executor_error_response(ExecutorError::from(e)))?; - Ok((bytes, payload)) -} - -fn extract_store(bytes: &[u8]) -> bool { - serde_json::from_slice::(bytes) - .ok() - .and_then(|j| j.get("store").and_then(serde_json::Value::as_bool)) - .unwrap_or(true) -} - -/// # Panics -/// Panics if the response builder produces an invalid response (unreachable in practice). -pub fn convert_response(resp: ProxyResponse) -> Response { - let mut builder = Response::builder().status(resp.status); - for (name, value) in &resp.headers { - builder = builder.header(name, value); - } - match resp.body { - ProxyBody::Full(bytes) => builder.body(Body::from(bytes)).expect("valid response"), - ProxyBody::Stream(stream) => builder.body(Body::from_stream(stream)).expect("valid response"), - } -} - -async fn proxy_responses(state: &AppState, parts: Parts, body: Bytes) -> Response { - let proxy_req = ProxyRequest { - headers: parts.headers, - body, - query: parts.uri.query().map(str::to_string), - }; - convert_response(proxy_request(proxy_req, &state.proxy_state).await) -} - -fn resolve_exec_ctx_from_headers(state: &AppState, headers: &HeaderMap) -> Arc { - let request_auth = headers - .get("authorization") - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.strip_prefix("Bearer ")) - .filter(|s| !s.is_empty()) - .map(str::to_string); - - if request_auth.is_some() && request_auth != state.exec_ctx.client_auth { - let mut ctx = (*state.exec_ctx).clone(); - ctx.client_auth = request_auth; - Arc::new(ctx) - } else { - Arc::clone(&state.exec_ctx) - } -} - -fn resolve_exec_ctx(state: &AppState, parts: &Parts) -> Arc { - resolve_exec_ctx_from_headers(state, &parts.headers) -} - -fn sse_response(stream: BoxStream) -> Response { - let byte_stream = stream.map(|line| Ok::(Bytes::from(line))); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "text/event-stream; charset=utf-8") - .header("Cache-Control", "no-cache") - .header("X-Accel-Buffering", "no") - .body(Body::from_stream(byte_stream)) - .expect("valid SSE response") -} - -async fn execute_responses(state: &AppState, parts: Parts, payload: RequestPayload) -> Response { - match execute(payload, resolve_exec_ctx(state, &parts)).await { - Ok(Either::Left(response_payload)) => axum::Json(response_payload).into_response(), - Ok(Either::Right(stream)) => sse_response(stream), - Err(e) => executor_error_response(e), - } -} - -/// # Panics -/// Panics if the response builder produces an invalid response (unreachable in practice). -pub fn executor_error_response(err: ExecutorError) -> Response { - let status = err.http_status(); - if !matches!(err, ExecutorError::LLMRequest { .. }) { - warn!("executor error ({status}): {err}"); - } - Response::builder() - .status(status) - .header("Content-Type", "application/json") - .body(Body::from(err.into_response_body())) - .expect("valid error response") -} - -pub async fn conversations(State(state): State, req: Request) -> Response { - let (_, body) = req.into_parts(); - let bytes = match read_bytes(body).await { - Ok(b) => b, - Err(e) => return e, - }; - - if !extract_store(&bytes) { - return executor_error_response(ExecutorError::InvalidRequest("conversations require store=true".into())); - } - - match create_conversation(&state.exec_ctx).await { - Ok(data) => axum::Json(json!({ - "id": data.conversation_id, - "created_at": data.created_at, - "object": "conversation", - "metadata": {} - })) - .into_response(), - Err(e) => executor_error_response(e), - } -} - -pub async fn responses(State(state): State, req: Request) -> Response { - let (parts, body) = req.into_parts(); - let (bytes, payload) = match read_and_parse(body).await { - Ok(v) => v, - Err(e) => return e, - }; - - let should_persist = payload.store || payload.previous_response_id.is_some() || payload.conversation_id.is_some(); - - if should_persist { - execute_responses(&state, parts, payload).await - } else { - proxy_responses(&state, parts, bytes).await - } -} - -pub async fn responses_ws(State(state): State, headers: HeaderMap, ws: WebSocketUpgrade) -> Response { - ws.max_message_size(MAX_BODY_SIZE) - .max_frame_size(MAX_BODY_SIZE) - .on_upgrade(move |socket| responses_ws_loop(socket, state, headers)) -} - -async fn responses_ws_loop(socket: WebSocket, state: AppState, headers: HeaderMap) { - let shutdown_token = state.shutdown_token.clone(); - let (mut sender, mut receiver) = socket.split(); - - loop { - let message = tokio::select! { - () = shutdown_token.cancelled() => break, - message = receiver.next() => message, - }; - - let Some(message) = message else { - break; - }; - - match message { - Ok(Message::Text(text)) => { - match handle_ws_text( - &mut sender, - &mut receiver, - &state, - &headers, - text.as_str(), - &shutdown_token, - ) - .await - { - Ok(()) => {} - Err(err) => { - if !handle_ws_error(&mut sender, err).await { - break; - } - } - } - } - Ok(Message::Binary(_)) => { - if !handle_ws_error(&mut sender, WsError::BinaryFrame).await { - break; - } - } - Ok(Message::Close(_)) => break, - Ok(Message::Ping(payload)) => { - if sender.send(Message::Pong(payload)).await.is_err() { - break; - } - } - Ok(Message::Pong(_)) => {} - Err(e) => { - warn!("responses websocket receive error: {e}"); - break; - } - } - } -} - -async fn handle_ws_text( - sender: &mut WsSender, - receiver: &mut WsReceiver, - state: &AppState, - headers: &HeaderMap, - text: &str, - shutdown_token: &CancellationToken, -) -> Result<(), WsError> { - let value = serde_json::from_str::(text).map_err(WsError::InvalidJson)?; - - if value.get("type").and_then(Value::as_str) != Some("response.create") { - return Err(WsError::UnexpectedType); - } - - let mut payload = serde_json::from_value::(value).map_err(ExecutorError::from)?; - payload.stream = true; - - let exec_ctx = resolve_exec_ctx_from_headers(state, headers); - let ctx = rehydrate_conversation(payload, &exec_ctx).await?; - let upstream_json = - serialize_to_string(&ctx.enriched_request.to_upstream_request(true)).map_err(ExecutorError::from)?; - - stream_ws_response(sender, receiver, exec_ctx, ctx, upstream_json, shutdown_token).await -} - -async fn stream_ws_response( - sender: &mut WsSender, - receiver: &mut WsReceiver, - exec_ctx: Arc, - ctx: RequestContext, - upstream_json: String, - shutdown_token: &CancellationToken, -) -> Result<(), WsError> { - let should_persist = ctx.original_request.store - || ctx.original_request.previous_response_id.is_some() - || ctx.conversation_id.is_some(); - let mut lines = Vec::new(); - let mut stream = Box::pin(call_inference( - upstream_json, - exec_ctx.responses_url(), - Arc::clone(&exec_ctx.client), - exec_ctx.client_auth.clone(), - exec_ctx.streaming_timeout, - )); - - 'stream: loop { - let next_line = tokio::select! { - () = shutdown_token.cancelled() => return Err(WsError::Shutdown), - message = receiver.next() => { - match message { - None | Some(Ok(Message::Close(_))) => return Err(WsError::ClientDisconnected), - Some(Ok(Message::Ping(payload))) => { - sender.send(Message::Pong(payload)).await.map_err(|_| WsError::SendFailed)?; - continue 'stream; - } - Some(Ok(Message::Pong(_))) => continue 'stream, - Some(Ok(Message::Binary(_))) => return Err(WsError::BinaryFrame), - Some(Ok(Message::Text(_))) => return Err(WsError::ConcurrentMessage), - Some(Err(e)) => return Err(WsError::Receive(e.to_string())), - } - } - line = stream.next() => line, - }; - let Some(line) = next_line else { - break; - }; - let line = match line { - Ok(line) => line, - Err(e) => return Err(WsError::Executor(e)), - }; - let Some(data) = line.strip_prefix("data: ") else { - continue; - }; - let data = data.trim(); - if data == "[DONE]" { - continue; - } - let mut value = match serde_json::from_str::(data) { - Ok(value) => value, - Err(e) => return Err(WsError::Executor(ExecutorError::from(e))), - }; - apply_gateway_response_ids(&mut value, &ctx); - send_ws_json(sender, value).await?; - if should_persist { - lines.push(line); - } - } - - if should_persist && !lines.is_empty() { - let acc = ResponseAccumulator::from_sse_lines(lines, ctx.conversation_id.as_deref()); - let mut payload = acc.finalize( - &ctx.enriched_request.model, - ctx.original_request.previous_response_id.as_deref(), - ctx.original_request.instructions.as_deref(), - ); - apply_gateway_payload_ids(&mut payload, &ctx); - let ch = exec_ctx.conv_handler.clone(); - let rh = exec_ctx.resp_handler.clone(); - if let Err(e) = persist_response(payload, ctx, ch, rh).await { - warn!("persist failed: {e}"); - } - } - Ok(()) -} - -fn apply_gateway_response_ids(value: &mut Value, ctx: &RequestContext) { - let Some(response) = value.get_mut("response").and_then(Value::as_object_mut) else { - return; - }; - response.insert("id".to_owned(), Value::String(ctx.response_id.clone())); - if let Some(previous_response_id) = &ctx.original_request.previous_response_id { - response.insert( - "previous_response_id".to_owned(), - Value::String(previous_response_id.clone()), - ); - } - if let Some(conversation_id) = &ctx.conversation_id { - response.insert("conversation_id".to_owned(), Value::String(conversation_id.clone())); - } -} - -fn apply_gateway_payload_ids(payload: &mut ResponsePayload, ctx: &RequestContext) { - payload.id.clone_from(&ctx.response_id); - payload.conversation_id.clone_from(&ctx.conversation_id); - payload - .previous_response_id - .clone_from(&ctx.original_request.previous_response_id); -} - -async fn handle_ws_error(sender: &mut WsSender, err: WsError) -> bool { - match err { - WsError::Shutdown | WsError::ClientDisconnected | WsError::SendFailed => false, - WsError::Receive(message) => { - warn!("responses websocket receive error: {message}"); - false - } - err => send_ws_error(sender, &err).await.is_ok(), - } -} - -async fn send_ws_error(sender: &mut WsSender, err: &WsError) -> Result<(), WsError> { - let Some(frame) = err.to_ws_frame() else { - return Err(WsError::SendFailed); - }; - send_ws_json(sender, frame).await -} - -async fn send_ws_json(sender: &mut WsSender, value: Value) -> Result<(), WsError> { - let text = serde_json::to_string(&value).map_err(WsError::SerializeJson)?; - sender - .send(Message::Text(text.into())) - .await - .map_err(|_| WsError::SendFailed) -} diff --git a/crates/agentic-server/src/handler/common.rs b/crates/agentic-server/src/handler/common.rs new file mode 100644 index 0000000..7e96c62 --- /dev/null +++ b/crates/agentic-server/src/handler/common.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; + +use axum::body::Body; +use axum::http::HeaderMap; +use axum::http::request::Parts; +use axum::response::Response; +use bytes::Bytes; +use futures::StreamExt; +use http::StatusCode; +use tracing::warn; + +use agentic_core::executor::{BoxStream, ExecutionContext, ExecutorError}; +use agentic_core::proxy::{ProxyBody, ProxyResponse, error_response}; +use agentic_core::types::request_response::RequestPayload; + +use crate::app::AppState; + +pub(super) const MAX_BODY_SIZE: usize = 10 * 1024 * 1024; + +/// # Panics +/// Panics if the response builder produces an invalid response (unreachable in practice). +pub fn convert_response(resp: ProxyResponse) -> Response { + let mut builder = Response::builder().status(resp.status); + for (name, value) in &resp.headers { + builder = builder.header(name, value); + } + match resp.body { + ProxyBody::Full(bytes) => builder.body(Body::from(bytes)).expect("valid response"), + ProxyBody::Stream(stream) => builder.body(Body::from_stream(stream)).expect("valid response"), + } +} + +/// # Panics +/// Panics if the response builder produces an invalid response (unreachable in practice). +pub fn executor_error_response(err: ExecutorError) -> Response { + let status = err.http_status(); + if !matches!(err, ExecutorError::LLMRequest { .. }) { + warn!("executor error ({status}): {err}"); + } + Response::builder() + .status(status) + .header("Content-Type", "application/json") + .body(Body::from(err.into_response_body())) + .expect("valid error response") +} + +pub(super) async fn read_bytes(body: Body) -> Result { + axum::body::to_bytes(body, MAX_BODY_SIZE).await.map_err(|_| { + convert_response(error_response( + StatusCode::PAYLOAD_TOO_LARGE, + "body_too_large", + "request body too large", + )) + }) +} + +pub(super) async fn read_and_parse(body: Body) -> Result<(Bytes, RequestPayload), Response> { + let bytes = read_bytes(body).await?; + let payload = serde_json::from_slice::(&bytes) + .map_err(|e| executor_error_response(ExecutorError::from(e)))?; + Ok((bytes, payload)) +} + +pub(super) fn extract_store(bytes: &[u8]) -> bool { + serde_json::from_slice::(bytes) + .ok() + .and_then(|j| j.get("store").and_then(serde_json::Value::as_bool)) + .unwrap_or(true) +} + +pub(super) fn resolve_exec_ctx_from_headers(state: &AppState, headers: &HeaderMap) -> Arc { + let request_auth = headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .filter(|s| !s.is_empty()) + .map(str::to_string); + + if request_auth.is_some() && request_auth != state.exec_ctx.client_auth { + let mut ctx = (*state.exec_ctx).clone(); + ctx.client_auth = request_auth; + Arc::new(ctx) + } else { + Arc::clone(&state.exec_ctx) + } +} + +pub(super) fn resolve_exec_ctx(state: &AppState, parts: &Parts) -> Arc { + resolve_exec_ctx_from_headers(state, &parts.headers) +} + +pub(super) fn sse_response(stream: BoxStream) -> Response { + let byte_stream = stream.map(|line| Ok::(Bytes::from(line))); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "text/event-stream; charset=utf-8") + .header("Cache-Control", "no-cache") + .header("X-Accel-Buffering", "no") + .body(Body::from_stream(byte_stream)) + .expect("valid SSE response") +} diff --git a/crates/agentic-server/src/handler/http/conversations.rs b/crates/agentic-server/src/handler/http/conversations.rs new file mode 100644 index 0000000..2657131 --- /dev/null +++ b/crates/agentic-server/src/handler/http/conversations.rs @@ -0,0 +1,31 @@ +use axum::extract::{Request, State}; +use axum::response::{IntoResponse, Response}; +use serde_json::json; + +use agentic_core::executor::{ExecutorError, create_conversation}; + +use super::super::common::{executor_error_response, extract_store, read_bytes}; +use crate::app::AppState; + +pub async fn conversations(State(state): State, req: Request) -> Response { + let (_, body) = req.into_parts(); + let bytes = match read_bytes(body).await { + Ok(b) => b, + Err(e) => return e, + }; + + if !extract_store(&bytes) { + return executor_error_response(ExecutorError::InvalidRequest("conversations require store=true".into())); + } + + match create_conversation(&state.exec_ctx).await { + Ok(data) => axum::Json(json!({ + "id": data.conversation_id, + "created_at": data.created_at, + "object": "conversation", + "metadata": {} + })) + .into_response(), + Err(e) => executor_error_response(e), + } +} diff --git a/crates/agentic-server/src/handler/http/mod.rs b/crates/agentic-server/src/handler/http/mod.rs new file mode 100644 index 0000000..1973bba --- /dev/null +++ b/crates/agentic-server/src/handler/http/mod.rs @@ -0,0 +1,7 @@ +mod conversations; +mod models; +mod responses; + +pub use conversations::conversations; +pub use models::{health, models, ready}; +pub use responses::responses; diff --git a/crates/agentic-server/src/handler/http/models.rs b/crates/agentic-server/src/handler/http/models.rs new file mode 100644 index 0000000..cc9251b --- /dev/null +++ b/crates/agentic-server/src/handler/http/models.rs @@ -0,0 +1,171 @@ +use std::sync::OnceLock; + +use axum::extract::{Query, State}; +use axum::http::HeaderMap; +use axum::response::{IntoResponse, Response}; +use http::StatusCode; +use serde_json::{Value, json}; +use tracing::warn; + +use agentic_core::proxy::{ProxyBody, ProxyResponse, error_response, proxy_get}; + +use super::super::common::convert_response; +use crate::app::AppState; + +/// Static fields shared by every Codex `ModelInfo` entry. +/// +/// Built once on first use; cloned per model and patched with the per-model +/// values (`slug`, `display_name`, `auto_review_model_override`, +/// `supports_reasoning_summaries`, `input_modalities`, and optionally +/// `context_window` / `max_context_window`). +fn codex_model_template() -> &'static Value { + static TEMPLATE: OnceLock = OnceLock::new(); + TEMPLATE.get_or_init(|| { + json!({ + "supported_in_api": true, + "priority": 1, + "shell_type": "shell_command", + "visibility": "list", + "base_instructions": "", + "supported_reasoning_levels": [ + {"effort": "low", "description": "Fast responses with lighter reasoning"}, + {"effort": "medium", "description": "Balances speed and reasoning depth"}, + {"effort": "high", "description": "Greater reasoning depth for complex problems"} + ], + "default_reasoning_summary": "auto", + "support_verbosity": false, + "default_verbosity": null, + "apply_patch_tool_type": "freeform", + "web_search_tool_type": "text", + "truncation_policy": {"mode": "bytes", "limit": 100_000}, + "supports_parallel_tool_calls": true, + "supports_image_detail_original": false, + "effective_context_window_percent": 95, + "experimental_supported_tools": [], + "supports_search_tool": false, + "use_responses_lite": false, + "tool_mode": null, + "multi_agent_version": null, + }) + }) +} + +/// Transform a single upstream model entry into a Codex `ModelInfo` object. +/// +/// Returns `None` when the entry has no `id` field (malformed upstream data). +fn upstream_model_to_codex(m: &Value) -> Option { + let id = m["id"].as_str()?.to_owned(); + let display_name = m.get("name").and_then(Value::as_str).unwrap_or(&id).to_owned(); + // vLLM uses max_model_len; other providers may use context_length + let context_length = m["max_model_len"].as_i64().or_else(|| m["context_length"].as_i64()); + // Single pass over capabilities for both flags + let (supports_reasoning, supports_image) = m["capabilities"].as_array().map_or((false, false), |c| { + c.iter().fold((false, false), |(r, i), v| { + let s = v.as_str(); + (r || s == Some("reasoning"), i || s == Some("image")) + }) + }); + let input_modalities = if supports_image { + json!(["text", "image"]) + } else { + json!(["text"]) + }; + + let mut model = codex_model_template().clone(); + let obj = model.as_object_mut().expect("template is object"); + obj.insert("slug".into(), json!(id)); + obj.insert("display_name".into(), json!(display_name)); + obj.insert("auto_review_model_override".into(), json!(id)); + obj.insert("supports_reasoning_summaries".into(), json!(supports_reasoning)); + obj.insert("input_modalities".into(), input_modalities); + if let Some(ctx) = context_length { + obj.insert("context_window".into(), json!(ctx)); + obj.insert("max_context_window".into(), json!(ctx)); + } + + Some(model) +} + +/// Build the Codex `ModelsResponse` from a raw upstream vLLM models payload. +fn build_codex_models_response(upstream_bytes: &[u8]) -> Value { + let models: Vec = serde_json::from_slice::(upstream_bytes) + .ok() + .and_then(|mut v| match v["data"].take() { + Value::Array(arr) => Some(arr), + _ => None, + }) + .into_iter() + .flatten() + .filter_map(|m| upstream_model_to_codex(&m)) + .collect(); + json!({ "models": models }) +} + +pub async fn health() -> impl IntoResponse { + StatusCode::OK +} + +pub async fn ready(State(state): State) -> impl IntoResponse { + let base = state.llm_api_base.trim_end_matches('/'); + let url = format!("{base}/health"); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(2)) + .build(); + + let Ok(client) = client else { + return StatusCode::SERVICE_UNAVAILABLE; + }; + + match client.get(&url).send().await { + Ok(resp) if resp.status().is_success() => StatusCode::OK, + Ok(resp) => { + warn!("LLM backend not ready: {}", resp.status()); + StatusCode::SERVICE_UNAVAILABLE + } + Err(e) => { + warn!("LLM backend unreachable: {e}"); + StatusCode::SERVICE_UNAVAILABLE + } + } +} + +/// Query parameters for GET /v1/models. +/// +/// Codex CLI appends `?client_version=` to identify itself; its presence +/// triggers transformation to the Codex `ModelsResponse` shape. +#[derive(serde::Deserialize)] +pub struct ModelsParams { + client_version: Option, +} + +/// GET /v1/models — Codex-aware model list. +/// +/// When `?client_version` is present (Codex CLI), fetches vLLM's model list via +/// [`proxy_get`] and transforms it into the Codex `ModelsResponse` shape +/// (`{ "models": [...] }` with rich metadata). Without `client_version`, the +/// upstream response is returned unchanged via [`proxy_get`]. +pub async fn models(State(state): State, headers: HeaderMap, Query(params): Query) -> Response { + let upstream = proxy_get("/v1/models", &headers, &state.proxy_state).await; + + if params.client_version.is_none() { + return convert_response(upstream); + } + + let ProxyBody::Full(upstream_bytes) = upstream.body else { + return convert_response(error_response( + http::StatusCode::BAD_GATEWAY, + "upstream_unavailable", + "unexpected streaming response from /v1/models", + )); + }; + + if !upstream.status.is_success() { + return convert_response(ProxyResponse { + body: ProxyBody::Full(upstream_bytes), + ..upstream + }); + } + + axum::Json(build_codex_models_response(&upstream_bytes)).into_response() +} diff --git a/crates/agentic-server/src/handler/http/responses.rs b/crates/agentic-server/src/handler/http/responses.rs new file mode 100644 index 0000000..5d22034 --- /dev/null +++ b/crates/agentic-server/src/handler/http/responses.rs @@ -0,0 +1,45 @@ +use axum::extract::{Request, State}; +use axum::http::request::Parts; +use axum::response::{IntoResponse, Response}; +use bytes::Bytes; +use either::Either; + +use agentic_core::executor::execute; +use agentic_core::proxy::{ProxyRequest, proxy_request}; +use agentic_core::types::request_response::RequestPayload; + +use super::super::common::{convert_response, executor_error_response, read_and_parse, resolve_exec_ctx, sse_response}; +use crate::app::AppState; + +async fn proxy_responses(state: &AppState, parts: Parts, body: Bytes) -> Response { + let proxy_req = ProxyRequest { + headers: parts.headers, + body, + query: parts.uri.query().map(str::to_string), + }; + convert_response(proxy_request(proxy_req, &state.proxy_state).await) +} + +async fn execute_responses(state: &AppState, parts: Parts, payload: RequestPayload) -> Response { + match execute(payload, resolve_exec_ctx(state, &parts)).await { + Ok(Either::Left(response_payload)) => axum::Json(response_payload).into_response(), + Ok(Either::Right(stream)) => sse_response(stream), + Err(e) => executor_error_response(e), + } +} + +pub async fn responses(State(state): State, req: Request) -> Response { + let (parts, body) = req.into_parts(); + let (bytes, payload) = match read_and_parse(body).await { + Ok(v) => v, + Err(e) => return e, + }; + + let should_persist = payload.store || payload.previous_response_id.is_some() || payload.conversation_id.is_some(); + + if should_persist { + execute_responses(&state, parts, payload).await + } else { + proxy_responses(&state, parts, bytes).await + } +} diff --git a/crates/agentic-server/src/handler/mod.rs b/crates/agentic-server/src/handler/mod.rs new file mode 100644 index 0000000..7943069 --- /dev/null +++ b/crates/agentic-server/src/handler/mod.rs @@ -0,0 +1,7 @@ +mod common; +pub mod http; +pub mod websocket; + +pub use common::{convert_response, executor_error_response}; +pub use http::{conversations, health, models, ready, responses}; +pub use websocket::responses_ws; diff --git a/crates/agentic-server/src/handler/websocket/error.rs b/crates/agentic-server/src/handler/websocket/error.rs new file mode 100644 index 0000000..16b42f0 --- /dev/null +++ b/crates/agentic-server/src/handler/websocket/error.rs @@ -0,0 +1,82 @@ +use http::StatusCode; +use serde_json::{Value, json}; +use thiserror::Error; + +use agentic_core::executor::ExecutorError; + +#[derive(Debug, Error)] +pub(super) enum WsError { + #[error(transparent)] + Executor(#[from] ExecutorError), + + #[error("invalid JSON: {0}")] + InvalidJson(#[source] serde_json::Error), + + #[error("failed to serialize websocket event: {0}")] + SerializeJson(#[source] serde_json::Error), + + #[error("websocket message type must be response.create")] + UnexpectedType, + + #[error("websocket messages must be JSON text frames")] + BinaryFrame, + + #[error("websocket send failed")] + SendFailed, + + #[error("websocket client disconnected")] + ClientDisconnected, + + #[error("websocket shutdown requested")] + Shutdown, + + #[error("websocket receive failed: {0}")] + Receive(String), +} + +impl WsError { + pub(super) fn status(&self) -> StatusCode { + match self { + Self::Executor(err) => err.http_status(), + Self::InvalidJson(_) | Self::UnexpectedType | Self::BinaryFrame => StatusCode::BAD_REQUEST, + Self::SerializeJson(_) + | Self::SendFailed + | Self::ClientDisconnected + | Self::Shutdown + | Self::Receive(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + pub(super) fn code(&self) -> &'static str { + match self { + Self::Executor(err) => err.error_code(), + Self::InvalidJson(_) => "invalid_json", + Self::UnexpectedType | Self::BinaryFrame => "invalid_request_error", + Self::SerializeJson(_) + | Self::SendFailed + | Self::ClientDisconnected + | Self::Shutdown + | Self::Receive(_) => "server_error", + } + } + + pub(super) fn to_ws_frame(&self) -> Option { + if matches!( + self, + Self::SerializeJson(_) | Self::SendFailed | Self::ClientDisconnected | Self::Shutdown | Self::Receive(_) + ) { + return None; + } + + let code = self.code(); + Some(json!({ + "type": "error", + "status": self.status().as_u16(), + "error": { + "message": self.to_string(), + "type": code, + "code": code + } + })) + } +} diff --git a/crates/agentic-server/src/handler/websocket/mod.rs b/crates/agentic-server/src/handler/websocket/mod.rs new file mode 100644 index 0000000..14c7501 --- /dev/null +++ b/crates/agentic-server/src/handler/websocket/mod.rs @@ -0,0 +1,4 @@ +mod error; +mod responses; + +pub use responses::responses_ws; diff --git a/crates/agentic-server/src/handler/websocket/responses.rs b/crates/agentic-server/src/handler/websocket/responses.rs new file mode 100644 index 0000000..6a7c11a --- /dev/null +++ b/crates/agentic-server/src/handler/websocket/responses.rs @@ -0,0 +1,269 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use axum::extract::State; +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; +use axum::http::HeaderMap; +use axum::response::Response; +use futures::stream::{SplitSink, SplitStream}; +use futures::{SinkExt, StreamExt}; +use serde_json::Value; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +use agentic_core::executor::accumulator::ResponseAccumulator; +use agentic_core::executor::{ + ExecutionContext, ExecutorError, RequestContext, call_inference, persist_response, rehydrate_conversation, +}; +use agentic_core::types::ResponsePayload; +use agentic_core::types::request_response::RequestPayload; +use agentic_core::utils::common::serialize_to_string; + +use super::super::common::{MAX_BODY_SIZE, resolve_exec_ctx_from_headers}; +use super::error::WsError; +use crate::app::AppState; + +type WsSender = SplitSink; +type WsReceiver = SplitStream; + +pub async fn responses_ws(State(state): State, headers: HeaderMap, ws: WebSocketUpgrade) -> Response { + ws.max_message_size(MAX_BODY_SIZE) + .max_frame_size(MAX_BODY_SIZE) + .on_upgrade(move |socket| responses_ws_loop(socket, state, headers)) +} + +async fn responses_ws_loop(socket: WebSocket, state: AppState, headers: HeaderMap) { + let shutdown_token = state.shutdown_token.clone(); + let (mut sender, mut receiver) = socket.split(); + + // Requests received while a stream is active, processed in order after it completes. + let mut queue: VecDeque = VecDeque::new(); + + loop { + let text = if let Some(buffered) = queue.pop_front() { + buffered + } else { + let message = tokio::select! { + () = shutdown_token.cancelled() => break, + message = receiver.next() => message, + }; + + let Some(message) = message else { + break; + }; + + match message { + Ok(Message::Text(text)) => text.to_string(), + Ok(Message::Binary(_)) => { + if !handle_ws_error(&mut sender, WsError::BinaryFrame).await { + break; + } + continue; + } + Ok(Message::Close(_)) => break, + Ok(Message::Ping(payload)) => { + if sender.send(Message::Pong(payload)).await.is_err() { + break; + } + continue; + } + Ok(Message::Pong(_)) => continue, + Err(e) => { + warn!("responses websocket receive error: {e}"); + break; + } + } + }; + + match handle_ws_text( + &mut sender, + &mut receiver, + &state, + &headers, + &text, + &shutdown_token, + &mut queue, + ) + .await + { + Ok(()) => {} + Err(err) => { + if !handle_ws_error(&mut sender, err).await { + break; + } + } + } + } +} + +/// Process one `response.create` message. +/// +/// Any requests received from the client while the stream is active are +/// pushed onto `queue` and processed by the caller in order after this returns. +async fn handle_ws_text( + sender: &mut WsSender, + receiver: &mut WsReceiver, + state: &AppState, + headers: &HeaderMap, + text: &str, + shutdown_token: &CancellationToken, + queue: &mut VecDeque, +) -> Result<(), WsError> { + let value = serde_json::from_str::(text).map_err(WsError::InvalidJson)?; + + if value.get("type").and_then(Value::as_str) != Some("response.create") { + return Err(WsError::UnexpectedType); + } + + let mut payload = serde_json::from_value::(value).map_err(ExecutorError::from)?; + payload.stream = true; + payload.store = true; + + let exec_ctx = resolve_exec_ctx_from_headers(state, headers); + let ctx = rehydrate_conversation(payload, &exec_ctx).await?; + let upstream_json = + serialize_to_string(&ctx.enriched_request.to_upstream_request(true)).map_err(ExecutorError::from)?; + + stream_ws_response(sender, receiver, exec_ctx, ctx, upstream_json, shutdown_token, queue).await +} + +/// Stream a response from the upstream LLM to the client. +/// +/// Requests arriving from the client while the stream is active are pushed +/// onto `queue` so the caller can process them in order after this returns. +async fn stream_ws_response( + sender: &mut WsSender, + receiver: &mut WsReceiver, + exec_ctx: Arc, + ctx: RequestContext, + upstream_json: String, + shutdown_token: &CancellationToken, + queue: &mut VecDeque, +) -> Result<(), WsError> { + let should_persist = ctx.original_request.store + || ctx.original_request.previous_response_id.is_some() + || ctx.conversation_id.is_some(); + let mut lines = Vec::new(); + let mut stream = Box::pin(call_inference( + upstream_json, + exec_ctx.responses_url(), + Arc::clone(&exec_ctx.client), + exec_ctx.client_auth.clone(), + exec_ctx.streaming_timeout, + )); + + 'stream: loop { + let next_line = tokio::select! { + () = shutdown_token.cancelled() => return Err(WsError::Shutdown), + message = receiver.next() => { + match message { + None | Some(Ok(Message::Close(_))) => return Err(WsError::ClientDisconnected), + Some(Ok(Message::Ping(payload))) => { + sender.send(Message::Pong(payload)).await.map_err(|_| WsError::SendFailed)?; + continue 'stream; + } + Some(Ok(Message::Pong(_))) => continue 'stream, + Some(Ok(Message::Binary(_))) => return Err(WsError::BinaryFrame), + Some(Ok(Message::Text(text))) => { + // Client pipelined the next request while we are still streaming. + // Enqueue it and keep draining the current stream. + queue.push_back(text.to_string()); + continue 'stream; + } + Some(Err(e)) => return Err(WsError::Receive(e.to_string())), + } + } + line = stream.next() => line, + }; + let Some(line) = next_line else { + break; + }; + let line = match line { + Ok(line) => line, + Err(e) => return Err(WsError::Executor(e)), + }; + let Some(data) = line.strip_prefix("data: ") else { + continue; + }; + let data = data.trim(); + if data == "[DONE]" { + continue; + } + let mut value = match serde_json::from_str::(data) { + Ok(value) => value, + Err(e) => return Err(WsError::Executor(ExecutorError::from(e))), + }; + apply_gateway_response_ids(&mut value, &ctx); + send_ws_json(sender, value).await?; + if should_persist { + lines.push(line); + } + } + + if should_persist && !lines.is_empty() { + let acc = ResponseAccumulator::from_sse_lines(lines, ctx.conversation_id.as_deref()); + let mut payload = acc.finalize( + &ctx.enriched_request.model, + ctx.original_request.previous_response_id.as_deref(), + ctx.original_request.instructions.as_deref(), + ); + apply_gateway_payload_ids(&mut payload, &ctx); + let ch = exec_ctx.conv_handler.clone(); + let rh = exec_ctx.resp_handler.clone(); + if let Err(e) = persist_response(payload, ctx, ch, rh).await { + warn!("persist failed: {e}"); + } + } + + Ok(()) +} + +fn apply_gateway_response_ids(value: &mut Value, ctx: &RequestContext) { + let Some(response) = value.get_mut("response").and_then(Value::as_object_mut) else { + return; + }; + response.insert("id".to_owned(), Value::String(ctx.response_id.clone())); + if let Some(previous_response_id) = &ctx.original_request.previous_response_id { + response.insert( + "previous_response_id".to_owned(), + Value::String(previous_response_id.clone()), + ); + } + if let Some(conversation_id) = &ctx.conversation_id { + response.insert("conversation_id".to_owned(), Value::String(conversation_id.clone())); + } +} + +fn apply_gateway_payload_ids(payload: &mut ResponsePayload, ctx: &RequestContext) { + payload.id.clone_from(&ctx.response_id); + payload.conversation_id.clone_from(&ctx.conversation_id); + payload + .previous_response_id + .clone_from(&ctx.original_request.previous_response_id); +} + +async fn handle_ws_error(sender: &mut WsSender, err: WsError) -> bool { + match err { + WsError::Shutdown | WsError::ClientDisconnected | WsError::SendFailed => false, + WsError::Receive(message) => { + warn!("responses websocket receive error: {message}"); + false + } + err => send_ws_error(sender, &err).await.is_ok(), + } +} + +async fn send_ws_error(sender: &mut WsSender, err: &WsError) -> Result<(), WsError> { + let Some(frame) = err.to_ws_frame() else { + return Err(WsError::SendFailed); + }; + send_ws_json(sender, frame).await +} + +async fn send_ws_json(sender: &mut WsSender, value: Value) -> Result<(), WsError> { + let text = serde_json::to_string(&value).map_err(WsError::SerializeJson)?; + sender + .send(Message::Text(text.into())) + .await + .map_err(|_| WsError::SendFailed) +}