diff --git a/crates/frontend/src/pages/admin_kiro_gateway.rs b/crates/frontend/src/pages/admin_kiro_gateway.rs index 0a737c2..76cf59f 100644 --- a/crates/frontend/src/pages/admin_kiro_gateway.rs +++ b/crates/frontend/src/pages/admin_kiro_gateway.rs @@ -233,18 +233,50 @@ fn anthropic_routing_badge(raw: Option<&str>) -> Option<&'static str> { } } +const PREFLIGHT_CHANGE_COUNT_KEYS: [&str; 3] = + ["tool_use_id_rewrite_count", "normalization_event_count", "tool_normalization_event_count"]; + +fn routing_diagnostic_preflight_change_count(raw: Option<&str>) -> Option { + let preflight = raw + .and_then(|value| serde_json::from_str::(value).ok()) + .and_then(|value| value.get("preflight").cloned())?; + if !preflight + .get("normalized") + .and_then(serde_json::Value::as_bool) + .unwrap_or(false) + { + return None; + } + if let Some(count) = preflight + .get("change_count") + .and_then(serde_json::Value::as_u64) + { + return (count > 0).then_some(count); + } + let count = PREFLIGHT_CHANGE_COUNT_KEYS + .into_iter() + .filter_map(|key| preflight.get(key).and_then(serde_json::Value::as_u64)) + .sum::(); + (count > 0).then_some(count) +} + fn anthropic_routing_summary(raw: Option<&str>) -> Option { let badge = anthropic_routing_badge(raw)?; let channel = routing_diagnostic_string(raw, "channel_name"); let probe_kind = routing_diagnostic_string(raw, "probe_kind"); - Some(match (channel, probe_kind) { - (Some(channel), Some(probe_kind)) => { - format!("{badge} · channel {channel} · {probe_kind}") - }, - (Some(channel), None) => format!("{badge} · channel {channel}"), - (None, Some(probe_kind)) => format!("{badge} · {probe_kind}"), - (None, None) => badge.to_string(), - }) + let preflight_changes = routing_diagnostic_preflight_change_count(raw); + let mut parts = vec![badge.to_string()]; + if let Some(channel) = channel { + parts.push(format!("channel {channel}")); + } + if let Some(probe_kind) = probe_kind { + parts.push(probe_kind); + } + if let Some(count) = preflight_changes { + let noun = if count == 1 { "change" } else { "changes" }; + parts.push(format!("preflight {count} {noun}")); + } + Some(parts.join(" · ")) } #[derive(Debug, Clone, PartialEq, Eq, Default)] @@ -5885,7 +5917,8 @@ mod tests { use serde_json::json; use super::{ - admin_kiro_key_total_pages, build_kiro_billable_multiplier_override_json, + admin_kiro_key_total_pages, anthropic_routing_summary, + build_kiro_billable_multiplier_override_json, build_kiro_billable_multiplier_override_patch, build_kiro_cache_policy_override_json, build_kiro_cache_policy_override_patch, format_compact_bytes, format_kiro_cache_policy_summary, format_kiro_key_candidate_credit_summary, @@ -5919,6 +5952,71 @@ mod tests { ); } + #[test] + fn anthropic_routing_summary_includes_preflight_change_count() { + let diagnostics = json!({ + "upstream_pool": "direct_anthropic", + "channel_name": "channel-a", + "preflight": { + "normalized": true, + "change_count": 2, + "tool_use_id_rewrite_count": 1, + "normalization_event_count": 1, + "tool_normalization_event_count": 0, + "tool_schema_keyword_count": 0 + } + }) + .to_string(); + + assert_eq!( + anthropic_routing_summary(Some(&diagnostics)), + Some("Anthropic 直连 · channel channel-a · preflight 2 changes".to_string()) + ); + } + + #[test] + fn anthropic_routing_summary_omits_schema_observation_only_preflight() { + let diagnostics = json!({ + "upstream_pool": "direct_anthropic", + "channel_name": "channel-a", + "preflight": { + "normalized": false, + "change_count": 0, + "tool_use_id_rewrite_count": 0, + "normalization_event_count": 0, + "tool_normalization_event_count": 0, + "tool_schema_keyword_count": 2 + } + }) + .to_string(); + + assert_eq!( + anthropic_routing_summary(Some(&diagnostics)), + Some("Anthropic 直连 · channel channel-a".to_string()) + ); + } + + #[test] + fn anthropic_routing_summary_omits_legacy_schema_only_preflight() { + let diagnostics = json!({ + "upstream_pool": "direct_anthropic", + "channel_name": "channel-a", + "preflight": { + "normalized": true, + "tool_use_id_rewrite_count": 0, + "normalization_event_count": 0, + "tool_normalization_event_count": 0, + "tool_schema_keyword_count": 2 + } + }) + .to_string(); + + assert_eq!( + anthropic_routing_summary(Some(&diagnostics)), + Some("Anthropic 直连 · channel channel-a".to_string()) + ); + } + #[test] fn kiro_key_route_summary_uses_full_pool_text_when_group_is_empty() { let summary = kiro_key_route_summary( diff --git a/crates/llm-access-kiro/src/anthropic/mod.rs b/crates/llm-access-kiro/src/anthropic/mod.rs index b90f4b9..3579f04 100644 --- a/crates/llm-access-kiro/src/anthropic/mod.rs +++ b/crates/llm-access-kiro/src/anthropic/mod.rs @@ -1,6 +1,7 @@ //! Anthropic-compatible Kiro request and stream conversion. pub mod converter; +pub mod preflight; pub mod protected_content; pub mod stream; pub mod types; diff --git a/crates/llm-access-kiro/src/anthropic/preflight.rs b/crates/llm-access-kiro/src/anthropic/preflight.rs new file mode 100644 index 0000000..e099756 --- /dev/null +++ b/crates/llm-access-kiro/src/anthropic/preflight.rs @@ -0,0 +1,108 @@ +//! Shared preflight for Anthropic-compatible requests entering the Kiro +//! surface. +//! +//! This module intentionally stops before Kiro wire conversion. It provides the +//! normalized Anthropic request shape that both Kiro-native dispatch and direct +//! Anthropic upstream dispatch can reuse without duplicating cleanup rules. + +use super::{ + converter::{ + normalize_request, ConversionError, NormalizationEvent, ToolNormalizationEvent, + ToolUseIdRewrite, ToolValidationSummary, + }, + types::MessagesRequest, +}; + +#[derive(Debug)] +pub struct PreprocessedMessagesRequest { + pub request: MessagesRequest, + pub tool_use_id_rewrites: Vec, + pub normalization_events: Vec, + pub tool_normalization_events: Vec, + pub tool_validation_summary: ToolValidationSummary, +} + +pub fn preprocess_messages_request( + request: &MessagesRequest, +) -> Result { + let normalized = normalize_request(request)?; + Ok(PreprocessedMessagesRequest { + request: normalized.request, + tool_use_id_rewrites: normalized.tool_use_id_rewrites, + normalization_events: normalized.normalization_events, + tool_normalization_events: normalized.tool_normalization_events, + tool_validation_summary: normalized.tool_validation_summary, + }) +} + +#[cfg(test)] +mod tests { + use crate::anthropic::{ + preflight::preprocess_messages_request, + types::{Message, MessagesRequest}, + }; + + fn request_with_invalid_history_tool_use_id() -> MessagesRequest { + MessagesRequest { + model: "claude-opus-4-8".to_string(), + _max_tokens: 128, + messages: vec![ + Message { + role: "user".to_string(), + content: serde_json::json!("Run the tool"), + }, + Message { + role: "assistant".to_string(), + content: serde_json::json!([ + { + "type": "tool_use", + "id": "toolu.01:bad", + "name": "read_file", + "input": {"path": "/tmp/test.txt"} + } + ]), + }, + Message { + role: "user".to_string(), + content: serde_json::json!([ + { + "type": "tool_result", + "tool_use_id": "toolu.01:bad", + "content": "file content" + } + ]), + }, + ], + stream: false, + system: None, + tools: None, + _tool_choice: None, + thinking: None, + output_config: None, + metadata: None, + } + } + + #[test] + fn preflight_normalizes_tool_use_ids_for_shared_kiro_anthropic_surface() { + let preflight = preprocess_messages_request(&request_with_invalid_history_tool_use_id()) + .expect("preflight should normalize reusable Kiro Anthropic request shape"); + + assert_eq!(preflight.tool_use_id_rewrites.len(), 1); + let rewrite = &preflight.tool_use_id_rewrites[0]; + assert_eq!(rewrite.original_tool_use_id, "toolu.01:bad"); + assert!(rewrite + .rewritten_tool_use_id + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-')); + + let assistant_id = preflight.request.messages[1].content[0]["id"] + .as_str() + .expect("assistant tool_use id should remain a string"); + let result_id = preflight.request.messages[2].content[0]["tool_use_id"] + .as_str() + .expect("tool_result id should remain a string"); + assert_eq!(assistant_id, rewrite.rewritten_tool_use_id); + assert_eq!(result_id, rewrite.rewritten_tool_use_id); + } +} diff --git a/crates/llm-access/src/provider.rs b/crates/llm-access/src/provider.rs index 4f1375f..c427a7c 100644 --- a/crates/llm-access/src/provider.rs +++ b/crates/llm-access/src/provider.rs @@ -1,6 +1,8 @@ //! Provider-facing HTTP entrypoints for `llm-access`. +mod anthropic_upstream_diagnostics; mod anthropic_upstream_dispatch; +mod anthropic_upstream_payload; mod cctest; mod client; mod codex_auth; diff --git a/crates/llm-access/src/provider/anthropic_upstream_diagnostics.rs b/crates/llm-access/src/provider/anthropic_upstream_diagnostics.rs new file mode 100644 index 0000000..c4cee6a --- /dev/null +++ b/crates/llm-access/src/provider/anthropic_upstream_diagnostics.rs @@ -0,0 +1,104 @@ +use llm_access_kiro::anthropic::preflight::PreprocessedMessagesRequest; +use serde_json::json; + +const PREFLIGHT_DETAIL_LIMIT: usize = 20; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) struct DirectAnthropicPreflightStats { + pub(super) change_count: usize, + pub(super) tool_use_id_rewrite_count: usize, + pub(super) normalization_event_count: usize, + pub(super) tool_normalization_event_count: usize, + pub(super) tool_schema_keyword_count: usize, +} + +impl DirectAnthropicPreflightStats { + pub(super) fn normalized(&self) -> bool { + self.change_count > 0 + } +} + +pub(super) fn direct_anthropic_preflight_stats( + preflight: &PreprocessedMessagesRequest, +) -> DirectAnthropicPreflightStats { + let tool_schema_keyword_count = preflight + .tool_validation_summary + .schema_keyword_counts + .values() + .sum(); + let change_count = [ + preflight.tool_use_id_rewrites.len(), + preflight.normalization_events.len(), + preflight.tool_normalization_events.len(), + ] + .into_iter() + .sum(); + + DirectAnthropicPreflightStats { + change_count, + tool_use_id_rewrite_count: preflight.tool_use_id_rewrites.len(), + normalization_event_count: preflight.normalization_events.len(), + tool_normalization_event_count: preflight.tool_normalization_events.len(), + tool_schema_keyword_count, + } +} + +pub(super) fn build_direct_anthropic_routing_diagnostics( + channel_name: &str, + pool_mode: &str, + preflight: &PreprocessedMessagesRequest, +) -> String { + let stats = direct_anthropic_preflight_stats(preflight); + json!({ + "upstream_pool": "direct_anthropic", + "channel_name": channel_name, + "pool_mode": pool_mode, + "preflight": { + "normalized": stats.normalized(), + "change_count": stats.change_count, + "tool_use_id_rewrite_count": stats.tool_use_id_rewrite_count, + "normalization_event_count": stats.normalization_event_count, + "tool_normalization_event_count": stats.tool_normalization_event_count, + "tool_schema_keyword_count": stats.tool_schema_keyword_count, + "tool_validation_summary": { + "normalized_tool_description_count": preflight.tool_validation_summary.normalized_tool_description_count, + "empty_tool_name_count": preflight.tool_validation_summary.empty_tool_name_count, + "schema_keyword_counts": &preflight.tool_validation_summary.schema_keyword_counts, + }, + "tool_use_id_rewrites": preflight.tool_use_id_rewrites.iter() + .take(PREFLIGHT_DETAIL_LIMIT) + .map(|rewrite| json!({ + "original_tool_use_id": &rewrite.original_tool_use_id, + "rewritten_tool_use_id": &rewrite.rewritten_tool_use_id, + "assistant_message_index": rewrite.assistant_message_index, + "content_block_index": rewrite.content_block_index, + "rewritten_tool_result_count": rewrite.rewritten_tool_result_count, + })) + .collect::>(), + "tool_use_id_rewrites_truncated": preflight.tool_use_id_rewrites.len() > PREFLIGHT_DETAIL_LIMIT, + "normalization_events": preflight.normalization_events.iter() + .take(PREFLIGHT_DETAIL_LIMIT) + .map(|event| json!({ + "message_index": event.message_index, + "role": &event.role, + "content_block_index": event.content_block_index, + "block_type": event.block_type.as_ref(), + "action": event.action, + "reason": event.reason, + })) + .collect::>(), + "normalization_events_truncated": preflight.normalization_events.len() > PREFLIGHT_DETAIL_LIMIT, + "tool_normalization_events": preflight.tool_normalization_events.iter() + .take(PREFLIGHT_DETAIL_LIMIT) + .map(|event| json!({ + "tool_index": event.tool_index, + "tool_name": &event.tool_name, + "action": event.action, + "reason": event.reason, + })) + .collect::>(), + "tool_normalization_events_truncated": preflight.tool_normalization_events.len() > PREFLIGHT_DETAIL_LIMIT, + } + }) + .to_string() +} diff --git a/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs b/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs index f7c935c..e68f518 100644 --- a/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs +++ b/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs @@ -26,9 +26,15 @@ use llm_access_core::{ }, usage::UsageEvent, }; -use serde_json::Value; +use llm_access_kiro::anthropic::preflight::PreprocessedMessagesRequest; use super::{ + anthropic_upstream_diagnostics::{ + build_direct_anthropic_routing_diagnostics, direct_anthropic_preflight_stats, + }, + anthropic_upstream_payload::{ + build_route_payload, prepare_direct_anthropic_payload, DirectAnthropicPreparedPayload, + }, client::anthropic_upstream_client, kiro_error::kiro_json_error, kiro_protocol::normalized_kiro_messages_path, @@ -193,32 +199,29 @@ pub(super) async fn maybe_dispatch_anthropic_upstream_pool( capture_client_request_body_json(&mut usage_meta, &replay.body); let parse_started = Instant::now(); - let payload = match serde_json::from_slice::(&replay.body) { - Ok(payload) => payload, - Err(_) => { - return AnthropicUpstreamDispatchOutcome::Handled(kiro_json_error( - StatusCode::BAD_REQUEST, - "invalid_request_error", - "request body must be a valid Anthropic messages JSON payload", - )) - }, - }; - let original_model = match payload - .get("model") - .and_then(Value::as_str) - .map(str::trim) - .filter(|value| !value.is_empty()) - { - Some(model) => model.to_string(), - None => { - return AnthropicUpstreamDispatchOutcome::Handled(kiro_json_error( - StatusCode::BAD_REQUEST, - "invalid_request_error", - "model is required", - )) - }, + let prepared = match prepare_direct_anthropic_payload(&replay.body) { + Ok(prepared) => prepared, + Err(err) => return AnthropicUpstreamDispatchOutcome::Handled(err.into_response()), }; usage_meta.mark_pre_handler_done(clamp_duration_ms(parse_started.elapsed())); + let DirectAnthropicPreparedPayload { + original_model, + preflight, + } = prepared; + let preflight_stats = direct_anthropic_preflight_stats(&preflight); + if preflight_stats.normalized() { + tracing::info!( + key_id = %key.key_id, + endpoint = %public_path, + model = %original_model, + preflight_change_count = preflight_stats.change_count, + tool_use_id_rewrite_count = preflight_stats.tool_use_id_rewrite_count, + normalization_event_count = preflight_stats.normalization_event_count, + tool_normalization_event_count = preflight_stats.tool_normalization_event_count, + tool_schema_keyword_count = preflight_stats.tool_schema_keyword_count, + "direct anthropic request preflight normalized" + ); + } let context = DirectAnthropicDispatchContext { key: &key, @@ -230,7 +233,7 @@ pub(super) async fn maybe_dispatch_anthropic_upstream_pool( let route_queue = order_routes_for_request(routes); let mut last_failure: Option = None; for route in route_queue { - let response = dispatch_one_route(&context, &route, &payload, &mut usage_meta).await; + let response = dispatch_one_route(&context, &route, &preflight, &mut usage_meta).await; let retryable = is_retryable_direct_status(response.status()); if !retryable { return AnthropicUpstreamDispatchOutcome::Handled(response); @@ -300,44 +303,10 @@ fn select_weighted_route( Some(routes.remove(index)) } -fn apply_model_mapping_to_json( - model_name_map_json: &str, - payload: &mut Value, -) -> anyhow::Result> { - let trimmed = model_name_map_json.trim(); - if trimmed.is_empty() || trimmed == "{}" { - return Ok(None); - } - let map = serde_json::from_str::>(trimmed)?; - let Some(model) = payload.get("model").and_then(Value::as_str) else { - return Ok(None); - }; - let Some(target) = map.get(model).cloned() else { - return Ok(None); - }; - if target == model { - return Ok(None); - } - let Some(object) = payload.as_object_mut() else { - return Ok(None); - }; - object.insert("model".to_string(), Value::String(target.clone())); - Ok(Some(target)) -} - -fn build_route_payload( - model_name_map_json: &str, - payload: &Value, -) -> anyhow::Result<(Value, Option)> { - let mut route_payload = payload.clone(); - let mapped_model = apply_model_mapping_to_json(model_name_map_json, &mut route_payload)?; - Ok((route_payload, mapped_model)) -} - async fn dispatch_one_route( context: &DirectAnthropicDispatchContext<'_>, route: &ProviderAnthropicUpstreamRoute, - payload: &Value, + preflight: &PreprocessedMessagesRequest, usage_meta: &mut ProviderUsageMetadata, ) -> axum::response::Response { let _key_permit = match try_acquire_key_permit( @@ -388,8 +357,13 @@ async fn dispatch_one_route( ); }, }; + usage_meta.routing_diagnostics_json = Some(build_direct_anthropic_routing_diagnostics( + &route.channel_name, + &route.pool_mode_at_event, + preflight, + )); let (route_payload, mapped_model) = - match build_route_payload(&route.model_name_map_json, payload) { + match build_route_payload(&route.model_name_map_json, &preflight.request) { Ok(output) => output, Err(err) => { tracing::warn!( @@ -714,14 +688,16 @@ async fn record_direct_usage( request_body_bytes: meta.request_body_bytes, quota_failover_count: meta.quota_failover_count, retry: meta.retry.clone(), - routing_diagnostics_json: Some( - serde_json::json!({ + routing_diagnostics_json: meta.routing_diagnostics_json.clone().or_else(|| { + Some( + serde_json::json!({ "upstream_pool": "direct_anthropic", "channel_name": route.channel_name, "pool_mode": route.pool_mode_at_event, - }) - .to_string(), - ), + }) + .to_string(), + ) + }), input_uncached_tokens: usage.input_uncached_tokens.max(0), input_cached_tokens: usage.input_cached_tokens.max(0), output_tokens: usage.output_tokens.max(0), @@ -824,8 +800,9 @@ mod tests { }; use super::{ - build_route_payload, is_anthropic_event_stream, is_retryable_direct_status, - observe_anthropic_sse_chunk, observe_anthropic_sse_tail, parse_anthropic_sse_usage, + build_direct_anthropic_routing_diagnostics, build_route_payload, is_anthropic_event_stream, + is_retryable_direct_status, observe_anthropic_sse_chunk, observe_anthropic_sse_tail, + parse_anthropic_sse_usage, prepare_direct_anthropic_payload, }; #[test] @@ -897,10 +874,18 @@ data: [DONE] #[test] fn builds_route_payload_with_route_specific_model_mapping() { - let payload = serde_json::json!({ - "model": "public-model", - "messages": [] - }); + let payload = llm_access_kiro::anthropic::types::MessagesRequest { + model: "public-model".to_string(), + _max_tokens: 128, + messages: vec![], + stream: false, + system: None, + tools: None, + _tool_choice: None, + thinking: None, + output_config: None, + metadata: None, + }; let (route_a_payload, route_a_model) = build_route_payload(r#"{"public-model":"upstream-a"}"#, &payload) @@ -911,8 +896,148 @@ data: [DONE] assert_eq!(route_a_model.as_deref(), Some("upstream-a")); assert_eq!(route_b_model.as_deref(), Some("upstream-b")); - assert_eq!(route_a_payload["model"], "upstream-a"); - assert_eq!(route_b_payload["model"], "upstream-b"); - assert_eq!(payload["model"], "public-model"); + assert_eq!(route_a_payload.model, "upstream-a"); + assert_eq!(route_b_payload.model, "upstream-b"); + assert_eq!(payload.model, "public-model"); + } + + #[test] + fn prepares_direct_payload_with_shared_kiro_anthropic_preflight() { + let payload = serde_json::json!({ + "model": "public-model", + "max_tokens": 128, + "messages": [ + {"role": "user", "content": "Run the tool"}, + { + "role": "assistant", + "content": [ + { + "type": "tool_use", + "id": "toolu.01:bad", + "name": "read_file", + "input": {"path": "/tmp/test.txt"} + } + ] + }, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "toolu.01:bad", + "content": "file content" + } + ] + } + ] + }); + + let prepared = prepare_direct_anthropic_payload(payload.to_string().as_bytes()) + .expect("direct payload should preprocess through shared Kiro Anthropic preflight"); + assert_eq!(prepared.original_model, "public-model"); + assert_eq!(prepared.preflight.tool_use_id_rewrites.len(), 1); + + let rewritten_id = &prepared.preflight.tool_use_id_rewrites[0].rewritten_tool_use_id; + assert_ne!(rewritten_id, "toolu.01:bad"); + assert!(rewritten_id + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-')); + + assert_eq!(prepared.preflight.request.messages[1].content[0]["id"], *rewritten_id); + assert_eq!(prepared.preflight.request.messages[2].content[0]["tool_use_id"], *rewritten_id); + } + + #[test] + fn direct_routing_diagnostics_exposes_preflight_summary() { + let payload = serde_json::json!({ + "model": "public-model", + "max_tokens": 128, + "messages": [ + {"role": "user", "content": "Run the tool"}, + { + "role": "assistant", + "content": [ + { + "type": "tool_use", + "id": "toolu.01:bad", + "name": "read_file", + "input": {"path": "/tmp/test.txt"} + } + ] + }, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "toolu.01:bad", + "content": "file content" + } + ] + } + ] + }); + let prepared = prepare_direct_anthropic_payload(payload.to_string().as_bytes()) + .expect("direct payload should preprocess through shared Kiro Anthropic preflight"); + + let diagnostics = build_direct_anthropic_routing_diagnostics( + "channel-a", + "preferred_before_kiro", + &prepared.preflight, + ); + let value: serde_json::Value = + serde_json::from_str(&diagnostics).expect("diagnostics should be JSON"); + + assert_eq!(value["upstream_pool"], "direct_anthropic"); + assert_eq!(value["channel_name"], "channel-a"); + assert_eq!(value["pool_mode"], "preferred_before_kiro"); + assert_eq!(value["preflight"]["normalized"], true); + assert_eq!(value["preflight"]["change_count"], 1); + assert_eq!(value["preflight"]["tool_use_id_rewrite_count"], 1); + assert_eq!(value["preflight"]["tool_use_id_rewrites"][0]["assistant_message_index"], 1); + assert_eq!(value["preflight"]["tool_use_id_rewrites"][0]["rewritten_tool_result_count"], 1); + } + + #[test] + fn direct_routing_diagnostics_does_not_mark_schema_observations_as_changes() { + let payload = serde_json::json!({ + "model": "public-model", + "max_tokens": 128, + "tools": [ + { + "name": "select_value", + "description": "Select a value", + "input_schema": { + "type": "object", + "properties": { + "value": { + "anyOf": [ + {"type": "string"}, + {"type": "number"} + ] + } + } + } + } + ], + "messages": [ + {"role": "user", "content": "Pick one"} + ] + }); + let prepared = prepare_direct_anthropic_payload(payload.to_string().as_bytes()) + .expect("schema keyword observations should not reject direct preflight"); + + let diagnostics = + build_direct_anthropic_routing_diagnostics("channel-a", "only", &prepared.preflight); + let value: serde_json::Value = + serde_json::from_str(&diagnostics).expect("diagnostics should be JSON"); + + assert_eq!(value["preflight"]["normalized"], false); + assert_eq!(value["preflight"]["change_count"], 0); + assert_eq!(value["preflight"]["tool_schema_keyword_count"], 1); + assert_eq!( + value["preflight"]["tool_validation_summary"]["schema_keyword_counts"]["anyOf"], + 1 + ); } } diff --git a/crates/llm-access/src/provider/anthropic_upstream_payload.rs b/crates/llm-access/src/provider/anthropic_upstream_payload.rs new file mode 100644 index 0000000..2e5dc4e --- /dev/null +++ b/crates/llm-access/src/provider/anthropic_upstream_payload.rs @@ -0,0 +1,93 @@ +use std::collections::BTreeMap; + +use axum::{http::StatusCode, response::Response}; +use llm_access_kiro::anthropic::{ + preflight::{preprocess_messages_request, PreprocessedMessagesRequest}, + types::MessagesRequest, +}; +use serde_json::Value; + +use super::kiro_error::kiro_json_error; + +pub(super) struct DirectAnthropicPreparedPayload { + pub(super) original_model: String, + pub(super) preflight: PreprocessedMessagesRequest, +} + +pub(super) fn prepare_direct_anthropic_payload( + body: &[u8], +) -> Result { + let raw = serde_json::from_slice::(body) + .map_err(|_| DirectAnthropicPayloadError::InvalidPayload)?; + let original_model = raw + .get("model") + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or(DirectAnthropicPayloadError::MissingModel)? + .to_string(); + let request = serde_json::from_value::(raw) + .map_err(|_| DirectAnthropicPayloadError::InvalidPayload)?; + let preflight = preprocess_messages_request(&request) + .map_err(|err| DirectAnthropicPayloadError::InvalidRequest(err.to_string()))?; + + Ok(DirectAnthropicPreparedPayload { + original_model, + preflight, + }) +} + +#[derive(Debug)] +pub(super) enum DirectAnthropicPayloadError { + InvalidPayload, + MissingModel, + InvalidRequest(String), +} + +impl DirectAnthropicPayloadError { + pub(super) fn into_response(self) -> Response { + match self { + Self::InvalidPayload => kiro_json_error( + StatusCode::BAD_REQUEST, + "invalid_request_error", + "request body must be a valid Anthropic messages JSON payload", + ), + Self::MissingModel => kiro_json_error( + StatusCode::BAD_REQUEST, + "invalid_request_error", + "model is required", + ), + Self::InvalidRequest(message) => { + kiro_json_error(StatusCode::BAD_REQUEST, "invalid_request_error", &message) + }, + } + } +} + +fn apply_model_mapping_to_request( + model_name_map_json: &str, + payload: &mut MessagesRequest, +) -> anyhow::Result> { + let trimmed = model_name_map_json.trim(); + if trimmed.is_empty() || trimmed == "{}" { + return Ok(None); + } + let map = serde_json::from_str::>(trimmed)?; + let Some(target) = map.get(&payload.model).cloned() else { + return Ok(None); + }; + if target == payload.model { + return Ok(None); + } + payload.model = target.clone(); + Ok(Some(target)) +} + +pub(super) fn build_route_payload( + model_name_map_json: &str, + payload: &MessagesRequest, +) -> anyhow::Result<(MessagesRequest, Option)> { + let mut route_payload = payload.clone(); + let mapped_model = apply_model_mapping_to_request(model_name_map_json, &mut route_payload)?; + Ok((route_payload, mapped_model)) +}