diff --git a/crates/backend/src/routes.rs b/crates/backend/src/routes.rs index 6949baee..b9bf2105 100644 --- a/crates/backend/src/routes.rs +++ b/crates/backend/src/routes.rs @@ -520,8 +520,10 @@ pub fn create_router(state: AppState) -> Router { .route("/static_flow/admin/llm-gateway/monitor", get(seo::seo_spa_shell)) .route("/admin/kiro-gateway", get(seo::seo_spa_shell)) .route("/admin/kiro-gateway/accounts", any(admin_kiro_accounts_entry)) + .route("/admin/kiro-gateway/upstream-channels", get(seo::seo_spa_shell)) .route("/static_flow/admin/kiro-gateway", get(seo::seo_spa_shell)) .route("/static_flow/admin/kiro-gateway/accounts", any(admin_kiro_accounts_entry)) + .route("/static_flow/admin/kiro-gateway/upstream-channels", get(seo::seo_spa_shell)) .route("/admin/llm-gateway/*path", any(crate::llm_access_admin_proxy::proxy_admin_request)) .route( "/static_flow/admin/llm-gateway/*path", diff --git a/crates/frontend/src/api.rs b/crates/frontend/src/api.rs index 70abfb40..c008b628 100644 --- a/crates/frontend/src/api.rs +++ b/crates/frontend/src/api.rs @@ -10887,11 +10887,33 @@ pub struct AdminAnthropicUpstreamChannelView { pub proxy_mode: String, pub proxy_config_id: Option, pub last_error: Option, + pub models: Vec, + pub last_models_status: Option, + pub last_models_latency_ms: Option, + pub last_models_checked_at: Option, + pub last_models_error: Option, + pub last_test_model: Option, + pub last_test_status: Option, + pub last_test_latency_ms: Option, + pub last_test_at: Option, + pub last_test_error: Option, pub usage: AdminAnthropicUpstreamUsageRollupView, pub created_at: i64, pub updated_at: i64, } +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[serde(default)] +pub struct AdminAnthropicUpstreamProbeResponseView { + pub ok: bool, + pub status: String, + pub status_code: Option, + pub latency_ms: u64, + pub error: Option, + pub channel: AdminAnthropicUpstreamChannelView, + pub generated_at: i64, +} + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] #[serde(default)] pub struct AdminAnthropicUpstreamChannelsResponse { @@ -10929,6 +10951,11 @@ pub struct PatchAdminAnthropicUpstreamChannelInput { pub clear_last_error: bool, } +#[derive(Debug, Serialize, Clone, PartialEq, Default)] +pub struct TestAdminAnthropicUpstreamModelInput { + pub model: String, +} + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] #[serde(default)] pub struct KiroAccountView { @@ -11904,6 +11931,71 @@ pub async fn patch_admin_anthropic_upstream_channel( } } +pub async fn refresh_admin_anthropic_upstream_models( + name: &str, +) -> Result { + #[cfg(feature = "mock")] + { + let _ = name; + Ok(AdminAnthropicUpstreamProbeResponseView::default()) + } + + #[cfg(not(feature = "mock"))] + { + let url = format!( + "{}/admin/kiro-gateway/anthropic-upstreams/{}/refresh-models", + llm_access_admin_base(), + urlencoding::encode(name) + ); + let response = api_post(&url) + .send() + .await + .map_err(|e| format!("Network error: {:?}", e))?; + if !response.ok() { + let text = response.text().await.unwrap_or_default(); + return Err(format!("Failed: {text}")); + } + response + .json() + .await + .map_err(|e| format!("Parse error: {:?}", e)) + } +} + +pub async fn test_admin_anthropic_upstream_model( + name: &str, + input: &TestAdminAnthropicUpstreamModelInput, +) -> Result { + #[cfg(feature = "mock")] + { + let _ = (name, input); + Ok(AdminAnthropicUpstreamProbeResponseView::default()) + } + + #[cfg(not(feature = "mock"))] + { + let url = format!( + "{}/admin/kiro-gateway/anthropic-upstreams/{}/test", + llm_access_admin_base(), + urlencoding::encode(name) + ); + let response = api_post(&url) + .json(input) + .map_err(|e| format!("Serialize error: {:?}", e))? + .send() + .await + .map_err(|e| format!("Network error: {:?}", e))?; + if !response.ok() { + let text = response.text().await.unwrap_or_default(); + return Err(format!("Failed: {text}")); + } + response + .json() + .await + .map_err(|e| format!("Parse error: {:?}", e)) + } +} + pub async fn delete_admin_anthropic_upstream_channel(name: &str) -> Result<(), String> { #[cfg(feature = "mock")] { diff --git a/crates/frontend/src/pages/admin_kiro_anthropic_upstreams.rs b/crates/frontend/src/pages/admin_kiro_anthropic_upstreams.rs new file mode 100644 index 00000000..144c88fb --- /dev/null +++ b/crates/frontend/src/pages/admin_kiro_anthropic_upstreams.rs @@ -0,0 +1,614 @@ +use std::collections::BTreeMap; + +use llm_access_core::store as llm_store; +use web_sys::{HtmlInputElement, HtmlSelectElement}; +use yew::prelude::*; +use yew_router::prelude::Link; + +use crate::{ + api::{ + create_admin_anthropic_upstream_channel, delete_admin_anthropic_upstream_channel, + fetch_admin_anthropic_upstream_channels, fetch_admin_llm_gateway_proxy_configs, + patch_admin_anthropic_upstream_channel, refresh_admin_anthropic_upstream_models, + test_admin_anthropic_upstream_model, AdminAnthropicUpstreamChannelView, + AdminUpstreamProxyConfigView, CreateAdminAnthropicUpstreamChannelInput, + PatchAdminAnthropicUpstreamChannelInput, TestAdminAnthropicUpstreamModelInput, + }, + pages::llm_access_shared::{confirm_destructive, format_number_u64, format_timestamp_opt}, + router::Route, +}; + +fn status_classes(status: &str) -> Classes { + if status == "ok" || status == "active" { + classes!( + "rounded-full", + "bg-emerald-500/10", + "px-2", + "py-1", + "font-mono", + "text-xs", + "text-emerald-700", + "dark:text-emerald-200" + ) + } else if status == "unchecked" || status.is_empty() { + classes!( + "rounded-full", + "border", + "border-[var(--border)]", + "px-2", + "py-1", + "font-mono", + "text-xs", + "text-[var(--muted)]" + ) + } else { + classes!( + "rounded-full", + "bg-amber-500/10", + "px-2", + "py-1", + "font-mono", + "text-xs", + "text-amber-700", + "dark:text-amber-200" + ) + } +} + +fn parse_proxy_choice(raw: &str) -> (String, Option) { + let trimmed = raw.trim(); + if trimmed == "direct" { + ("direct".to_string(), None) + } else if let Some(proxy_config_id) = trimmed.strip_prefix("fixed:") { + ("fixed".to_string(), Some(proxy_config_id.to_string())) + } else { + ("inherit".to_string(), None) + } +} + +fn total_input(channel: &AdminAnthropicUpstreamChannelView) -> u64 { + channel + .usage + .input_uncached_tokens + .saturating_add(channel.usage.input_cached_tokens) +} + +#[function_component(AdminKiroAnthropicUpstreamsPage)] +pub fn admin_kiro_anthropic_upstreams_page() -> Html { + let channels = use_state(Vec::::new); + let proxy_configs = use_state(Vec::::new); + let loading = use_state(|| true); + let error = use_state(|| None::); + let flash = use_state(|| None::); + let refresh_tick = use_state(|| 0u64); + + let name = use_state(String::new); + let base_url = use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_BASE_URL.to_string()); + let api_key = use_state(String::new); + let weight = use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_WEIGHT.to_string()); + let max_concurrency = + use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_MAX_CONCURRENCY.to_string()); + let min_start_interval_ms = + use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_MIN_START_INTERVAL_MS.to_string()); + let proxy_mode = use_state(|| "inherit".to_string()); + let saving = use_state(|| false); + let refreshing_channel = use_state(|| None::); + let testing_channel = use_state(|| None::); + let selected_models = use_state(BTreeMap::::new); + + let notify = { + let flash = flash.clone(); + let error = error.clone(); + Callback::from(move |(message, is_error): (String, bool)| { + if is_error { + error.set(Some(message)); + flash.set(None); + } else { + flash.set(Some(message)); + error.set(None); + } + }) + }; + + let reload = { + let refresh_tick = refresh_tick.clone(); + Callback::from(move |_| refresh_tick.set((*refresh_tick).saturating_add(1))) + }; + + { + let channels = channels.clone(); + let proxy_configs = proxy_configs.clone(); + let loading = loading.clone(); + let error = error.clone(); + use_effect_with(*refresh_tick, move |_| { + let channels = channels.clone(); + let proxy_configs = proxy_configs.clone(); + let loading = loading.clone(); + let error = error.clone(); + wasm_bindgen_futures::spawn_local(async move { + loading.set(true); + let (channels_result, proxy_configs_result) = futures::join!( + fetch_admin_anthropic_upstream_channels(), + fetch_admin_llm_gateway_proxy_configs() + ); + match (channels_result, proxy_configs_result) { + (Ok(channel_resp), Ok(proxy_resp)) => { + channels.set(channel_resp.channels); + proxy_configs.set(proxy_resp.proxy_configs); + error.set(None); + }, + (Err(err), _) | (_, Err(err)) => error.set(Some(err)), + } + loading.set(false); + }); + || () + }); + } + + let on_create = { + let name = name.clone(); + let base_url = base_url.clone(); + let api_key = api_key.clone(); + let weight = weight.clone(); + let max_concurrency = max_concurrency.clone(); + let min_start_interval_ms = min_start_interval_ms.clone(); + let proxy_mode = proxy_mode.clone(); + let saving = saving.clone(); + let notify = notify.clone(); + let reload = reload.clone(); + Callback::from(move |_| { + if *saving { + return; + } + let name_value = (*name).trim().to_string(); + let base_url_value = (*base_url).trim().to_string(); + let api_key_value = (*api_key).trim().to_string(); + let weight_value = (*weight).trim().parse::(); + let max_value = (*max_concurrency).trim().parse::(); + let min_value = (*min_start_interval_ms).trim().parse::(); + let proxy_choice = (*proxy_mode).clone(); + let name = name.clone(); + let api_key = api_key.clone(); + let saving = saving.clone(); + let notify = notify.clone(); + let reload = reload.clone(); + wasm_bindgen_futures::spawn_local(async move { + let Ok(weight_value) = weight_value else { + notify.emit(("Weight must be an integer.".to_string(), true)); + return; + }; + let Ok(max_value) = max_value else { + notify.emit(("Concurrency must be an integer.".to_string(), true)); + return; + }; + let Ok(min_value) = min_value else { + notify.emit(("Min interval must be an integer.".to_string(), true)); + return; + }; + let (proxy_mode, proxy_config_id) = parse_proxy_choice(&proxy_choice); + saving.set(true); + let input = CreateAdminAnthropicUpstreamChannelInput { + name: name_value, + base_url: base_url_value, + api_key: api_key_value, + status: Some("active".to_string()), + weight: Some(weight_value), + max_concurrency: Some(max_value), + min_start_interval_ms: Some(min_value), + proxy_mode: Some(proxy_mode), + proxy_config_id, + }; + match create_admin_anthropic_upstream_channel(&input).await { + Ok(channel) => { + name.set(String::new()); + api_key.set(String::new()); + notify.emit((format!("Created `{}`.", channel.name), false)); + reload.emit(()); + }, + Err(err) => notify.emit((format!("Create failed.\n{err}"), true)), + } + saving.set(false); + }); + }) + }; + + let total_billable = channels + .iter() + .fold(0u64, |sum, channel| sum.saturating_add(channel.usage.billable_tokens)); + let total_tokens = channels.iter().fold(0u64, |sum, channel| { + sum.saturating_add(total_input(channel)) + .saturating_add(channel.usage.output_tokens) + }); + let active_channels = channels + .iter() + .filter(|channel| channel.status == "active") + .count(); + + html! { +
+
+
+
+
+
{ "Kiro / Anthropic" }
+

{ "Upstream Channels" }

+
+
+ to={Route::AdminKiroGateway} classes={classes!("btn-terminal")}>{ "Kiro Overview" }> + +
+
+
+
+
{ "Active / Total" }
+
{ format!("{active_channels} / {}", channels.len()) }
+
+
+
{ "Tokens" }
+
{ format_number_u64(total_tokens) }
+
+
+
{ "Billable" }
+
{ format_number_u64(total_billable) }
+
+
+ if let Some(message) = (*flash).clone() { +
{ message }
+ } + if let Some(err) = (*error).clone() { +
{ err }
+ } +
+ +
+
+ + + + + + + +
+ +
+
+
+ +
+
+
{ "Channel" }
+
{ "Usage" }
+
{ "Models" }
+
{ "Last Test" }
+
{ "Actions" }
+
+ { for channels.iter().map(|channel| { + let channel_name = channel.name.clone(); + let selected_model = selected_models + .get(&channel_name) + .cloned() + .filter(|value| channel.models.iter().any(|model| model == value)) + .or_else(|| channel.models.first().cloned()) + .unwrap_or_default(); + let models_status = channel.last_models_status.clone().unwrap_or_else(|| "unchecked".to_string()); + let test_status = channel.last_test_status.clone().unwrap_or_else(|| "unchecked".to_string()); + let is_refreshing = (*refreshing_channel).as_ref().is_some_and(|name| name == &channel_name); + let is_testing = (*testing_channel).as_ref().is_some_and(|name| name == &channel_name); + let on_select_model = { + let selected_models = selected_models.clone(); + let channel_name = channel_name.clone(); + Callback::from(move |event: Event| { + let select: HtmlSelectElement = event.target_unchecked_into(); + let mut next = (*selected_models).clone(); + next.insert(channel_name.clone(), select.value()); + selected_models.set(next); + }) + }; + let on_refresh_models = { + let notify = notify.clone(); + let reload = reload.clone(); + let refreshing_channel = refreshing_channel.clone(); + let channel_name = channel_name.clone(); + Callback::from(move |_| { + if (*refreshing_channel).is_some() { + return; + } + refreshing_channel.set(Some(channel_name.clone())); + let notify = notify.clone(); + let reload = reload.clone(); + let refreshing_channel = refreshing_channel.clone(); + let channel_name = channel_name.clone(); + wasm_bindgen_futures::spawn_local(async move { + match refresh_admin_anthropic_upstream_models(&channel_name).await { + Ok(response) => { + notify.emit((format!("Refreshed `{channel_name}`: {}.", response.status), !response.ok)); + reload.emit(()); + }, + Err(err) => notify.emit((format!("Refresh `{channel_name}` failed.\n{err}"), true)), + } + refreshing_channel.set(None); + }); + }) + }; + let on_test_model = { + let notify = notify.clone(); + let reload = reload.clone(); + let testing_channel = testing_channel.clone(); + let channel_name = channel_name.clone(); + let model = selected_model.clone(); + Callback::from(move |_| { + if (*testing_channel).is_some() { + return; + } + let model = model.trim().to_string(); + if model.is_empty() { + notify.emit(("Select a model before testing.".to_string(), true)); + return; + } + testing_channel.set(Some(channel_name.clone())); + let notify = notify.clone(); + let reload = reload.clone(); + let testing_channel = testing_channel.clone(); + let channel_name = channel_name.clone(); + wasm_bindgen_futures::spawn_local(async move { + let input = TestAdminAnthropicUpstreamModelInput { model: model.clone() }; + match test_admin_anthropic_upstream_model(&channel_name, &input).await { + Ok(response) => { + notify.emit((format!("Tested `{channel_name}` / `{model}`: {} ms.", response.latency_ms), !response.ok)); + reload.emit(()); + }, + Err(err) => notify.emit((format!("Test `{channel_name}` / `{model}` failed.\n{err}"), true)), + } + testing_channel.set(None); + }); + }) + }; + let on_toggle = { + let notify = notify.clone(); + let reload = reload.clone(); + let channel_name = channel_name.clone(); + let next_status = if channel.status == "active" { "disabled" } else { "active" }.to_string(); + Callback::from(move |_| { + let notify = notify.clone(); + let reload = reload.clone(); + let channel_name = channel_name.clone(); + let next_status = next_status.clone(); + wasm_bindgen_futures::spawn_local(async move { + let input = PatchAdminAnthropicUpstreamChannelInput { + status: Some(next_status), + ..PatchAdminAnthropicUpstreamChannelInput::default() + }; + match patch_admin_anthropic_upstream_channel(&channel_name, &input).await { + Ok(_) => { + notify.emit((format!("Updated `{channel_name}`."), false)); + reload.emit(()); + }, + Err(err) => notify.emit((format!("Update `{channel_name}` failed.\n{err}"), true)), + } + }); + }) + }; + let on_rotate_key = { + let notify = notify.clone(); + let reload = reload.clone(); + let channel_name = channel_name.clone(); + Callback::from(move |_| { + let Some(window) = web_sys::window() else { + notify.emit(("Browser window is unavailable.".to_string(), true)); + return; + }; + let prompt = format!("New API key for `{channel_name}`"); + let Ok(Some(api_key)) = window.prompt_with_message(&prompt) else { + return; + }; + let api_key = api_key.trim().to_string(); + if api_key.is_empty() { + notify.emit(("API key must not be empty.".to_string(), true)); + return; + } + let notify = notify.clone(); + let reload = reload.clone(); + let channel_name = channel_name.clone(); + wasm_bindgen_futures::spawn_local(async move { + let input = PatchAdminAnthropicUpstreamChannelInput { + api_key: Some(api_key), + ..PatchAdminAnthropicUpstreamChannelInput::default() + }; + match patch_admin_anthropic_upstream_channel(&channel_name, &input).await { + Ok(_) => { + notify.emit((format!("Rotated key for `{channel_name}`."), false)); + reload.emit(()); + }, + Err(err) => notify.emit((format!("Rotate `{channel_name}` failed.\n{err}"), true)), + } + }); + }) + }; + let on_delete = { + let notify = notify.clone(); + let reload = reload.clone(); + let channel_name = channel_name.clone(); + Callback::from(move |_| { + if !confirm_destructive(&format!("Delete `{channel_name}`?")) { + return; + } + let notify = notify.clone(); + let reload = reload.clone(); + let channel_name = channel_name.clone(); + wasm_bindgen_futures::spawn_local(async move { + match delete_admin_anthropic_upstream_channel(&channel_name).await { + Ok(_) => { + notify.emit((format!("Deleted `{channel_name}`."), false)); + reload.emit(()); + }, + Err(err) => notify.emit((format!("Delete `{channel_name}` failed.\n{err}"), true)), + } + }); + }) + }; + html! { +
+
+
+ { channel.name.clone() } + { channel.status.clone() } +
+
{ channel.base_url.clone() }
+
+ { format!("w={} · c={} · min={}ms · proxy={}", channel.weight, channel.max_concurrency, channel.min_start_interval_ms, channel.proxy_mode) } +
+
+
+
{ format!("input {}", format_number_u64(total_input(channel))) }
+
{ format!("cached {}", format_number_u64(channel.usage.input_cached_tokens)) }
+
{ format!("output {}", format_number_u64(channel.usage.output_tokens)) }
+
{ format!("billable {}", format_number_u64(channel.usage.billable_tokens)) }
+
{ format!("missing {} · {}", channel.usage.usage_missing_events, format_timestamp_opt(channel.usage.last_used_at)) }
+
+
+
+ { models_status } + { format!("{} models", channel.models.len()) } +
+
+ { format!("{} · {}", channel.last_models_latency_ms.map(|value| format!("{value}ms")).unwrap_or_else(|| "-".to_string()), format_timestamp_opt(channel.last_models_checked_at)) } +
+ if let Some(error) = channel.last_models_error.as_deref() { +
{ error }
+ } +
+
+
+ { test_status } + { channel.last_test_model.clone().unwrap_or_else(|| "-".to_string()) } +
+
+ { format!("{} · {}", channel.last_test_latency_ms.map(|value| format!("{value}ms")).unwrap_or_else(|| "-".to_string()), format_timestamp_opt(channel.last_test_at)) } +
+ if let Some(error) = channel.last_test_error.as_deref() { +
{ error }
+ } +
+
+
+ + + + +
+
+ + +
+ if let Some(error) = channel.last_error.as_deref() { +
{ error }
+ } +
+
+ } + }) } + if channels.is_empty() && !*loading { +
{ "No Anthropic upstream channels configured." }
+ } +
+
+
+ } +} diff --git a/crates/frontend/src/pages/admin_kiro_gateway.rs b/crates/frontend/src/pages/admin_kiro_gateway.rs index 2fe64f81..0a737c2c 100644 --- a/crates/frontend/src/pages/admin_kiro_gateway.rs +++ b/crates/frontend/src/pages/admin_kiro_gateway.rs @@ -12,27 +12,23 @@ use yew_router::prelude::Link; use crate::{ api::{ - create_admin_anthropic_upstream_channel, create_admin_kiro_account_group, - create_admin_kiro_key, create_admin_kiro_manual_account, - delete_admin_anthropic_upstream_channel, delete_admin_kiro_account, - delete_admin_kiro_account_group, delete_admin_kiro_key, - fetch_admin_anthropic_upstream_channels, fetch_admin_kiro_account_group_options, - fetch_admin_kiro_account_groups_page, fetch_admin_kiro_accounts, - fetch_admin_kiro_accounts_page, fetch_admin_kiro_cache_stats, fetch_admin_kiro_keys_page, - fetch_admin_kiro_usage_event_detail, fetch_admin_kiro_usage_events, - fetch_admin_llm_gateway_config, fetch_admin_llm_gateway_proxy_bindings, - fetch_admin_llm_gateway_proxy_configs, fetch_kiro_models, import_admin_kiro_account, - patch_admin_anthropic_upstream_channel, patch_admin_kiro_account, + create_admin_kiro_account_group, create_admin_kiro_key, create_admin_kiro_manual_account, + delete_admin_kiro_account, delete_admin_kiro_account_group, delete_admin_kiro_key, + fetch_admin_kiro_account_group_options, fetch_admin_kiro_account_groups_page, + fetch_admin_kiro_accounts, fetch_admin_kiro_accounts_page, fetch_admin_kiro_cache_stats, + fetch_admin_kiro_keys_page, fetch_admin_kiro_usage_event_detail, + fetch_admin_kiro_usage_events, fetch_admin_llm_gateway_config, + fetch_admin_llm_gateway_proxy_bindings, fetch_admin_llm_gateway_proxy_configs, + fetch_kiro_models, import_admin_kiro_account, patch_admin_kiro_account, patch_admin_kiro_account_group, patch_admin_kiro_key, refresh_admin_kiro_account_balance, update_admin_llm_gateway_config, AdminAccountGroupOptionView, AdminAccountGroupView, - AdminAccountsSummaryView, AdminAnthropicUpstreamChannelView, AdminKiroCacheStatsResponse, + AdminAccountsSummaryView, AdminKiroCacheStatsResponse, AdminKiroKeyCandidateCreditSummaryView, AdminLlmGatewayKeyView, AdminLlmGatewayKeysSummaryView, AdminLlmGatewayUsageEventDetailView, AdminLlmGatewayUsageEventView, AdminLlmGatewayUsageEventsQuery, AdminUpstreamProxyBindingView, AdminUpstreamProxyConfigView, CreateAdminAccountGroupInput, - CreateAdminAnthropicUpstreamChannelInput, CreateManualKiroAccountInput, KiroAccountView, - KiroBalanceView, KiroModelView, LlmGatewayRuntimeConfig, PatchAdminAccountGroupInput, - PatchAdminAnthropicUpstreamChannelInput, PatchAdminLlmGatewayKeyRequest, + CreateManualKiroAccountInput, KiroAccountView, KiroBalanceView, KiroModelView, + LlmGatewayRuntimeConfig, PatchAdminAccountGroupInput, PatchAdminLlmGatewayKeyRequest, PatchKiroAccountInput, }, components::{ @@ -41,8 +37,8 @@ use crate::{ }, pages::llm_access_shared::{ confirm_destructive, format_float2, format_kiro_disabled_reason, format_ms, - format_number_i64, format_number_u64, format_reset_hint, kiro_credit_ratio, - kiro_key_usage_ratio, usage_error_summary, MaskedSecretCode, + format_number_i64, format_number_u64, format_reset_hint, format_timestamp_opt, + kiro_credit_ratio, kiro_key_usage_ratio, usage_error_summary, MaskedSecretCode, }, router::Route, }; @@ -133,10 +129,6 @@ extern "C" { fn copy_text(text: &str); } -fn format_timestamp_opt(ts: Option) -> String { - ts.map(format_ms).unwrap_or_else(|| "-".to_string()) -} - fn format_float4(value: f64) -> String { format!("{value:.4}") } @@ -221,6 +213,40 @@ fn format_json_for_textarea(raw: &str) -> String { .unwrap_or_else(|| raw.to_string()) } +fn routing_diagnostic_string(raw: Option<&str>, key: &str) -> Option { + raw.and_then(|value| serde_json::from_str::(value).ok()) + .and_then(|value| { + value + .get(key) + .and_then(serde_json::Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToString::to_string) + }) +} + +fn anthropic_routing_badge(raw: Option<&str>) -> Option<&'static str> { + match routing_diagnostic_string(raw, "upstream_pool").as_deref() { + Some("direct_anthropic_test") => Some("Anthropic 测试"), + Some("direct_anthropic") => Some("Anthropic 直连"), + _ => None, + } +} + +fn anthropic_routing_summary(raw: Option<&str>) -> Option { + let badge = anthropic_routing_badge(raw)?; + let channel = routing_diagnostic_string(raw, "channel_name"); + let probe_kind = routing_diagnostic_string(raw, "probe_kind"); + Some(match (channel, probe_kind) { + (Some(channel), Some(probe_kind)) => { + format!("{badge} · channel {channel} · {probe_kind}") + }, + (Some(channel), None) => format!("{badge} · channel {channel}"), + (None, Some(probe_kind)) => format!("{badge} · {probe_kind}"), + (None, None) => badge.to_string(), + }) +} + #[derive(Debug, Clone, PartialEq, Eq, Default)] struct KiroCachePolicyBandForm { credit_start: String, @@ -3274,20 +3300,6 @@ pub fn admin_kiro_gateway_page() -> Html { let runtime_config = use_state(|| None::); let kiro_cache_stats = use_state(|| None::); let kiro_cache_stats_error = use_state(|| None::); - let anthropic_upstream_channels = use_state(Vec::::new); - let anthropic_upstream_error = use_state(|| None::); - let anthropic_upstream_name = use_state(String::new); - let anthropic_upstream_base_url = - use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_BASE_URL.to_string()); - let anthropic_upstream_api_key = use_state(String::new); - let anthropic_upstream_weight = - use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_WEIGHT.to_string()); - let anthropic_upstream_max_concurrency = - use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_MAX_CONCURRENCY.to_string()); - let anthropic_upstream_min_start_interval_ms = - use_state(|| llm_store::DEFAULT_ANTHROPIC_UPSTREAM_MIN_START_INTERVAL_MS.to_string()); - let anthropic_upstream_proxy_mode = use_state(|| "inherit".to_string()); - let saving_anthropic_upstream = use_state(|| false); let kiro_cache_policy_form = use_state(KiroCachePolicyForm::default); let persisted_kiro_cache_policy_form = use_state(KiroCachePolicyForm::default); let kiro_cache_kmodels_json = use_state(String::new); @@ -3448,8 +3460,6 @@ pub fn admin_kiro_gateway_page() -> Html { let keys_summary = keys_summary.clone(); let kiro_cache_stats = kiro_cache_stats.clone(); let kiro_cache_stats_error = kiro_cache_stats_error.clone(); - let anthropic_upstream_channels = anthropic_upstream_channels.clone(); - let anthropic_upstream_error = anthropic_upstream_error.clone(); let kiro_cache_policy_form = kiro_cache_policy_form.clone(); let persisted_kiro_cache_policy_form = persisted_kiro_cache_policy_form.clone(); let kiro_cache_kmodels_json = kiro_cache_kmodels_json.clone(); @@ -3478,8 +3488,6 @@ pub fn admin_kiro_gateway_page() -> Html { let keys_summary = keys_summary.clone(); let kiro_cache_stats = kiro_cache_stats.clone(); let kiro_cache_stats_error = kiro_cache_stats_error.clone(); - let anthropic_upstream_channels = anthropic_upstream_channels.clone(); - let anthropic_upstream_error = anthropic_upstream_error.clone(); let kiro_cache_policy_form = kiro_cache_policy_form.clone(); let persisted_kiro_cache_policy_form = persisted_kiro_cache_policy_form.clone(); let kiro_cache_kmodels_json = kiro_cache_kmodels_json.clone(); @@ -3505,7 +3513,6 @@ pub fn admin_kiro_gateway_page() -> Html { proxy_configs_result, proxy_bindings_result, cache_stats_result, - anthropic_upstream_result, ) = futures::join!( fetch_admin_llm_gateway_config(), fetch_admin_kiro_accounts_page(1, 0), @@ -3513,7 +3520,6 @@ pub fn admin_kiro_gateway_page() -> Html { fetch_admin_llm_gateway_proxy_configs(), fetch_admin_llm_gateway_proxy_bindings(), fetch_admin_kiro_cache_stats(), - fetch_admin_anthropic_upstream_channels(), ); match ( config_result, @@ -3601,16 +3607,6 @@ pub fn admin_kiro_gateway_page() -> Html { kiro_cache_stats_error.set(Some(err)); }, } - match anthropic_upstream_result { - Ok(channels_resp) => { - anthropic_upstream_channels.set(channels_resp.channels); - anthropic_upstream_error.set(None); - }, - Err(err) => { - anthropic_upstream_channels.set(Vec::new()); - anthropic_upstream_error.set(Some(err)); - }, - } }, (Err(err), _, _, _, _) | (_, Err(err), _, _, _) @@ -4127,108 +4123,6 @@ pub fn admin_kiro_gateway_page() -> Html { }) }; - let on_create_anthropic_upstream = { - let name = anthropic_upstream_name.clone(); - let base_url = anthropic_upstream_base_url.clone(); - let api_key = anthropic_upstream_api_key.clone(); - let weight = anthropic_upstream_weight.clone(); - let max_concurrency = anthropic_upstream_max_concurrency.clone(); - let min_start_interval_ms = anthropic_upstream_min_start_interval_ms.clone(); - let proxy_mode = anthropic_upstream_proxy_mode.clone(); - let saving = saving_anthropic_upstream.clone(); - let panel_error = anthropic_upstream_error.clone(); - let notify = notify.clone(); - let on_reload = on_reload.clone(); - Callback::from(move |_| { - if *saving { - return; - } - let name_value = (*name).clone(); - let base_url_value = (*base_url).clone(); - let api_key_value = (*api_key).clone(); - let weight_value = (*weight).clone(); - let max_concurrency_value = (*max_concurrency).clone(); - let min_start_interval_ms_value = (*min_start_interval_ms).clone(); - let selected_proxy_value = (*proxy_mode).clone(); - let name = name.clone(); - let api_key = api_key.clone(); - let saving = saving.clone(); - let panel_error = panel_error.clone(); - let notify = notify.clone(); - let on_reload = on_reload.clone(); - wasm_bindgen_futures::spawn_local(async move { - let parsed_weight = match weight_value.trim().parse::() { - Ok(value) => value, - Err(_) => { - let message = "Anthropic upstream weight must be an integer.".to_string(); - panel_error.set(Some(message.clone())); - notify.emit((message, true)); - return; - }, - }; - let parsed_max = match max_concurrency_value.trim().parse::() { - Ok(value) => value, - Err(_) => { - let message = - "Anthropic upstream max concurrency must be an integer.".to_string(); - panel_error.set(Some(message.clone())); - notify.emit((message, true)); - return; - }, - }; - let parsed_min = match min_start_interval_ms_value.trim().parse::() { - Ok(value) => value, - Err(_) => { - let message = - "Anthropic upstream min interval must be an integer.".to_string(); - panel_error.set(Some(message.clone())); - notify.emit((message, true)); - return; - }, - }; - let (proxy_mode_value, proxy_config_id) = if selected_proxy_value.trim() == "direct" - { - ("direct".to_string(), None) - } else if let Some(proxy_config_id) = - selected_proxy_value.trim().strip_prefix("fixed:") - { - ("fixed".to_string(), Some(proxy_config_id.to_string())) - } else { - ("inherit".to_string(), None) - }; - saving.set(true); - panel_error.set(None); - let input = CreateAdminAnthropicUpstreamChannelInput { - name: name_value.trim().to_string(), - base_url: base_url_value.trim().to_string(), - api_key: api_key_value.trim().to_string(), - status: Some("active".to_string()), - weight: Some(parsed_weight), - max_concurrency: Some(parsed_max), - min_start_interval_ms: Some(parsed_min), - proxy_mode: Some(proxy_mode_value), - proxy_config_id, - }; - match create_admin_anthropic_upstream_channel(&input).await { - Ok(channel) => { - name.set(String::new()); - api_key.set(String::new()); - notify.emit(( - format!("Created Anthropic upstream `{}`.", channel.name), - false, - )); - on_reload.emit(()); - }, - Err(err) => { - panel_error.set(Some(err.clone())); - notify.emit((format!("Failed to create Anthropic upstream.\n{err}"), true)); - }, - } - saving.set(false); - }); - }) - }; - let on_import_local = { let import_name = import_name.clone(); let import_sqlite_path = import_sqlite_path.clone(); @@ -4736,283 +4630,12 @@ pub fn admin_kiro_gateway_page() -> Html { { "Anthropic Upstream Channels" }

- { "这些 channel 只服务显式打开 Anthropic 直连模式的 Kiro key;默认 key 仍完全走 Kiro 账号池。" } + { "直连 channel 管理、模型刷新、模型测试和 channel token rollup 已移到独立页面,Overview 不再首屏加载完整 channel 列表。" }

- - - if let Some(err) = (*anthropic_upstream_error).clone() { -
- { err } -
- } -
- - - - - - - -
- -
-
-
- { for (*anthropic_upstream_channels).iter().map(|channel| { - let channel_name = channel.name.clone(); - let next_status = if channel.status == "active" { "disabled" } else { "active" }.to_string(); - let toggle_label = if channel.status == "active" { "Disable" } else { "Enable" }; - let on_toggle = { - let notify = notify.clone(); - let on_reload = on_reload.clone(); - let channel_name = channel_name.clone(); - let next_status = next_status.clone(); - Callback::from(move |_| { - let notify = notify.clone(); - let on_reload = on_reload.clone(); - let channel_name = channel_name.clone(); - let next_status = next_status.clone(); - wasm_bindgen_futures::spawn_local(async move { - let input = PatchAdminAnthropicUpstreamChannelInput { - status: Some(next_status.clone()), - ..PatchAdminAnthropicUpstreamChannelInput::default() - }; - match patch_admin_anthropic_upstream_channel(&channel_name, &input).await { - Ok(_) => { - notify.emit((format!("Updated Anthropic upstream `{channel_name}`."), false)); - on_reload.emit(()); - }, - Err(err) => notify.emit((format!("Failed to update Anthropic upstream `{channel_name}`.\n{err}"), true)), - } - }); - }) - }; - let on_delete = { - let notify = notify.clone(); - let on_reload = on_reload.clone(); - let channel_name = channel_name.clone(); - Callback::from(move |_| { - if !confirm_destructive(&format!("Delete Anthropic upstream `{channel_name}`?")) { - return; - } - let notify = notify.clone(); - let on_reload = on_reload.clone(); - let channel_name = channel_name.clone(); - wasm_bindgen_futures::spawn_local(async move { - match delete_admin_anthropic_upstream_channel(&channel_name).await { - Ok(_) => { - notify.emit((format!("Deleted Anthropic upstream `{channel_name}`."), false)); - on_reload.emit(()); - }, - Err(err) => notify.emit((format!("Failed to delete Anthropic upstream `{channel_name}`.\n{err}"), true)), - } - }); - }) - }; - let on_rotate_key = { - let notify = notify.clone(); - let on_reload = on_reload.clone(); - let channel_name = channel_name.clone(); - Callback::from(move |_| { - let Some(window) = web_sys::window() else { - notify.emit(("Browser window is unavailable.".to_string(), true)); - return; - }; - let prompt = format!("New API key for Anthropic upstream `{channel_name}`"); - let Ok(Some(api_key)) = window.prompt_with_message(&prompt) else { - return; - }; - let api_key = api_key.trim().to_string(); - if api_key.is_empty() { - notify.emit(("API key must not be empty.".to_string(), true)); - return; - } - let notify = notify.clone(); - let on_reload = on_reload.clone(); - let channel_name = channel_name.clone(); - wasm_bindgen_futures::spawn_local(async move { - let input = PatchAdminAnthropicUpstreamChannelInput { - api_key: Some(api_key), - ..PatchAdminAnthropicUpstreamChannelInput::default() - }; - match patch_admin_anthropic_upstream_channel(&channel_name, &input).await { - Ok(_) => { - notify.emit((format!("Rotated API key for Anthropic upstream `{channel_name}`."), false)); - on_reload.emit(()); - }, - Err(err) => notify.emit((format!("Failed to rotate API key for Anthropic upstream `{channel_name}`.\n{err}"), true)), - } - }); - }) - }; - html! { -
-
-
-
{ channel.name.clone() }
-
- { channel.base_url.clone() } -
-
-
- - { channel.status.clone() } - - - - -
-
-
-
{ "weight " }{ channel.weight }
-
{ "concurrency " }{ channel.max_concurrency }
-
{ "proxy " }{ channel.proxy_mode.clone() }
-
{ "input " }{ format_number_u64(channel.usage.input_uncached_tokens.saturating_add(channel.usage.input_cached_tokens)) }
-
{ "output " }{ format_number_u64(channel.usage.output_tokens) }
-
{ "billable " }{ format_number_u64(channel.usage.billable_tokens) }
-
-
- { format!("cached={} · missing={} · last_used={}", format_number_u64(channel.usage.input_cached_tokens), channel.usage.usage_missing_events, format_timestamp_opt(channel.usage.last_used_at)) } -
- if let Some(error) = channel.last_error.as_deref() { -
{ error }
- } -
- } - }) } - if (*anthropic_upstream_channels).is_empty() { -
- { "No Anthropic upstream channels configured." } -
- } + to={Route::AdminKiroAnthropicUpstreams} classes={classes!("btn-terminal", "btn-terminal-primary")}> + { "Open Channels" } + >
@@ -6001,6 +5624,8 @@ pub fn admin_kiro_gateway_page() -> Html { event.error_class.as_deref(), event.session_blocked, ); + let anthropic_badge = + anthropic_routing_badge(event.routing_diagnostics_json.as_deref()); let event_id = event.id.clone(); let on_detail = { let open_usage_detail = open_usage_detail.clone(); @@ -6030,6 +5655,11 @@ pub fn admin_kiro_gateway_page() -> Html { { class_label } } + if let Some(badge) = anthropic_badge { + + { badge } + + } if let Some(summary) = status_error_summary.clone() { { summary } @@ -6063,6 +5693,8 @@ pub fn admin_kiro_gateway_page() -> Html { } } else if let Some(detail) = (*selected_usage_event).clone() { let close_usage_detail = close_usage_detail.clone(); + let anthropic_summary = + anthropic_routing_summary(detail.routing_diagnostics_json.as_deref()); html! {
@@ -6094,7 +5726,15 @@ pub fn admin_kiro_gateway_page() -> Html { { usage_detail_kv("cached", format_number_u64(detail.input_cached_tokens)) } { usage_detail_kv("output", format_number_u64(detail.output_tokens)) } { usage_detail_kv("billable", format_number_u64(detail.billable_tokens)) } + { + if let Some(summary) = anthropic_summary.clone() { + usage_detail_kv("routing", summary) + } else { + Html::default() + } + }
+ { usage_detail_pre("routing diagnostics", detail.routing_diagnostics_json.clone().unwrap_or_else(|| "-".to_string())) } { usage_detail_pre("request headers", detail.request_headers_json.clone()) } { usage_detail_pre("client request", detail.client_request_body_json.clone().unwrap_or_else(|| "-".to_string())) } { usage_detail_pre("upstream request", detail.upstream_request_body_json.clone().unwrap_or_else(|| "-".to_string())) } diff --git a/crates/frontend/src/pages/llm_access_shared.rs b/crates/frontend/src/pages/llm_access_shared.rs index 50e425e4..8532b268 100644 --- a/crates/frontend/src/pages/llm_access_shared.rs +++ b/crates/frontend/src/pages/llm_access_shared.rs @@ -76,6 +76,10 @@ pub fn format_ms(ts_ms: i64) -> String { ) } +pub fn format_timestamp_opt(ts_ms: Option) -> String { + ts_ms.map(format_ms).unwrap_or_else(|| "-".to_string()) +} + pub fn format_bytes_human(bytes: u64) -> String { const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"]; if bytes < 1024 { diff --git a/crates/frontend/src/pages/mod.rs b/crates/frontend/src/pages/mod.rs index 995ce531..3653c6a2 100644 --- a/crates/frontend/src/pages/mod.rs +++ b/crates/frontend/src/pages/mod.rs @@ -3,6 +3,7 @@ pub mod admin_ai_stream; pub mod admin_article_request_stream; pub mod admin_gpt2api_rs; pub mod admin_kiro_account_status; +pub mod admin_kiro_anthropic_upstreams; pub mod admin_kiro_gateway; pub mod admin_llm_gateway; pub mod admin_llm_gateway_monitor; diff --git a/crates/frontend/src/router.rs b/crates/frontend/src/router.rs index 8081fabd..325036dd 100644 --- a/crates/frontend/src/router.rs +++ b/crates/frontend/src/router.rs @@ -150,6 +150,13 @@ pub enum Route { #[at("/static_flow/admin/kiro-gateway")] AdminKiroGateway, + #[cfg(not(feature = "mock"))] + #[at("/admin/kiro-gateway/upstream-channels")] + AdminKiroAnthropicUpstreams, + #[cfg(feature = "mock")] + #[at("/static_flow/admin/kiro-gateway/upstream-channels")] + AdminKiroAnthropicUpstreams, + #[cfg(not(feature = "mock"))] #[at("/admin/kiro-gateway/accounts")] AdminKiroAccountStatus, @@ -284,6 +291,9 @@ fn switch(route: Route) -> Html { html! { } }, Route::AdminKiroGateway => html! { }, + Route::AdminKiroAnthropicUpstreams => { + html! { } + }, Route::AdminKiroAccountStatus => { html! { } }, diff --git a/crates/frontend/src/seo.rs b/crates/frontend/src/seo.rs index a6d10a2e..5e38ad26 100644 --- a/crates/frontend/src/seo.rs +++ b/crates/frontend/src/seo.rs @@ -294,6 +294,9 @@ fn route_path_for(route: &Route) -> String { Route::AdminLlmGatewayMonitor => config::route_path("/admin/llm-gateway/monitor"), Route::AdminKiroGateway => config::route_path("/admin/kiro-gateway"), Route::AdminKiroAccountStatus => config::route_path("/admin/kiro-gateway/accounts"), + Route::AdminKiroAnthropicUpstreams => { + config::route_path("/admin/kiro-gateway/upstream-channels") + }, Route::AdminGpt2ApiRs => config::route_path("/admin/gpt2api-rs"), Route::AdminCommentRuns { task_id, @@ -510,6 +513,7 @@ pub fn apply_route_seo(route: Option<&Route>) { | Route::AdminLlmGatewayMonitor | Route::AdminKiroGateway | Route::AdminKiroAccountStatus + | Route::AdminKiroAnthropicUpstreams | Route::AdminGpt2ApiRs | Route::AdminCommentRuns { .. diff --git a/crates/llm-access-anthropic-pool/src/lib.rs b/crates/llm-access-anthropic-pool/src/lib.rs index 97975f41..fb1f17ac 100644 --- a/crates/llm-access-anthropic-pool/src/lib.rs +++ b/crates/llm-access-anthropic-pool/src/lib.rs @@ -5,6 +5,20 @@ use std::{ net::IpAddr, }; +/// Default Anthropic API version used when a caller does not supply one. +pub const ANTHROPIC_VERSION_2023_06_01: &str = "2023-06-01"; + +/// Apply the standard direct Anthropic authentication/version headers. +pub fn apply_anthropic_auth_headers( + request: reqwest::RequestBuilder, + api_key: &str, + anthropic_version: &str, +) -> reqwest::RequestBuilder { + request + .header("x-api-key", api_key) + .header("anthropic-version", anthropic_version) +} + /// Candidate channel with an admin-controlled routing weight. #[derive(Debug, Clone, PartialEq, Eq)] pub struct WeightedChannel { @@ -165,13 +179,12 @@ pub fn merge_usage( } } -/// Build the standard Anthropic Messages endpoint from a versioned base URL. -pub fn build_messages_url(base_url: &str) -> anyhow::Result { +fn build_anthropic_endpoint_url(base_url: &str, endpoint: &str) -> anyhow::Result { let trimmed = base_url.trim().trim_end_matches('/'); if trimmed.is_empty() { anyhow::bail!("anthropic upstream base_url is empty"); } - let url = reqwest::Url::parse(&format!("{trimmed}/messages"))?; + let url = reqwest::Url::parse(&format!("{trimmed}/{endpoint}"))?; if url.scheme() != "https" { anyhow::bail!("anthropic upstream base_url must use https"); } @@ -197,7 +210,40 @@ pub fn build_messages_url(base_url: &str) -> anyhow::Result { Ok(url.to_string()) } -fn is_private_or_loopback_ip(ip: IpAddr) -> bool { +/// Build the standard Anthropic Messages endpoint from a versioned base URL. +pub fn build_messages_url(base_url: &str) -> anyhow::Result { + build_anthropic_endpoint_url(base_url, "messages") +} + +/// Build the standard Anthropic Models endpoint from a versioned base URL. +pub fn build_models_url(base_url: &str) -> anyhow::Result { + build_anthropic_endpoint_url(base_url, "models") +} + +/// Parse model ids from the standard Anthropic Models list response. +pub fn parse_model_ids_from_models_response(body: &[u8]) -> anyhow::Result> { + let value: serde_json::Value = serde_json::from_slice(body)?; + let Some(data) = value.get("data").and_then(serde_json::Value::as_array) else { + anyhow::bail!("anthropic models response must contain data array"); + }; + let mut model_ids = Vec::with_capacity(data.len()); + for item in data { + let Some(model_id) = item + .get("id") + .and_then(serde_json::Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + anyhow::bail!("anthropic models response contains model without id"); + }; + model_ids.push(model_id.to_string()); + } + Ok(model_ids) +} + +/// Return whether an IP target is local/private and must not be used as a +/// direct Anthropic upstream host. +pub fn is_private_or_loopback_ip(ip: IpAddr) -> bool { match ip { IpAddr::V4(v4) => { v4.is_loopback() @@ -292,6 +338,18 @@ mod tests { ); } + #[test] + fn anthropic_upstream_url_appends_models_to_versioned_base_url() { + assert_eq!( + build_models_url("https://api.anthropic.com/v1").expect("url"), + "https://api.anthropic.com/v1/models" + ); + assert_eq!( + build_models_url("https://example.com/root/").expect("url"), + "https://example.com/root/models" + ); + } + #[test] fn upstream_url_rejects_plaintext_and_local_targets() { for base_url in [ @@ -309,4 +367,29 @@ mod tests { ); } } + + #[test] + fn parses_anthropic_upstream_models_response_ids() { + let model_ids = parse_model_ids_from_models_response( + br#"{"type":"list","data":[{"type":"model","id":"claude-sonnet-4-6"},{"type":"model","id":"claude-haiku-4-5"}]}"#, + ) + .expect("models response"); + + assert_eq!(model_ids, vec!["claude-sonnet-4-6", "claude-haiku-4-5"]); + } + + #[test] + fn rejects_malformed_anthropic_upstream_models_response() { + for body in [ + br#"not json"#.as_slice(), + br#"{}"#.as_slice(), + br#"{"data":[{"type":"model"}]}"#.as_slice(), + br#"{"data":[{"id":" "}]} "#.as_slice(), + ] { + assert!( + parse_model_ids_from_models_response(body).is_err(), + "malformed models response should be rejected" + ); + } + } } diff --git a/crates/llm-access-core/src/store/anthropic_upstream.rs b/crates/llm-access-core/src/store/anthropic_upstream.rs index 4fa52002..aeb952d9 100644 --- a/crates/llm-access-core/src/store/anthropic_upstream.rs +++ b/crates/llm-access-core/src/store/anthropic_upstream.rs @@ -86,6 +86,36 @@ pub struct AdminAnthropicUpstreamChannel { pub proxy_config_id: Option, /// Last hot-path error, if any. pub last_error: Option, + /// Latest upstream-visible model ids from an admin `/models` refresh. + #[serde(default)] + pub models: Vec, + /// Latest `/models` refresh status. + #[serde(default)] + pub last_models_status: Option, + /// Latest `/models` refresh latency. + #[serde(default)] + pub last_models_latency_ms: Option, + /// Latest `/models` refresh timestamp. + #[serde(default)] + pub last_models_checked_at: Option, + /// Latest `/models` refresh error summary. + #[serde(default)] + pub last_models_error: Option, + /// Latest admin `/messages` test model. + #[serde(default)] + pub last_test_model: Option, + /// Latest admin `/messages` test status. + #[serde(default)] + pub last_test_status: Option, + /// Latest admin `/messages` test latency. + #[serde(default)] + pub last_test_latency_ms: Option, + /// Latest admin `/messages` test timestamp. + #[serde(default)] + pub last_test_at: Option, + /// Latest admin `/messages` test error summary. + #[serde(default)] + pub last_test_error: Option, /// Token rollup for this channel. pub usage: AdminAnthropicUpstreamUsageRollup, /// Creation timestamp. @@ -109,6 +139,54 @@ pub struct AdminAnthropicUpstreamChannelsPage { pub has_more: bool, } +/// Internal admin probe target, including secret material and resolved proxy. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AdminAnthropicUpstreamProbeTarget { + /// Stable channel name. + pub name: String, + /// Anthropic-compatible base URL. + pub base_url: String, + /// API key sent as `x-api-key`. + pub api_key: String, + /// Resolved proxy settings for this probe request. + pub proxy: Option, + /// Proxy resolution error, when channel config exists but cannot produce a + /// usable proxy. + pub proxy_error: Option, + /// Last model-test probe timestamp, used for admin-side cooldown. + pub last_test_at: Option, +} + +/// Latest `/models` refresh state to persist for one upstream channel. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AdminAnthropicUpstreamModelsStatusUpdate { + /// Model ids returned by the upstream key. + pub model_ids: Vec, + /// Stable status label: `ok`, `http_`, or `error`. + pub status: String, + /// Observed request latency. + pub latency_ms: Option, + /// Probe timestamp. + pub checked_at_ms: i64, + /// Sanitized error summary. + pub error: Option, +} + +/// Latest `/messages` model-test state to persist for one upstream channel. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AdminAnthropicUpstreamTestStatusUpdate { + /// Tested model id. + pub model: String, + /// Stable status label: `ok`, `http_`, or `error`. + pub status: String, + /// Observed request latency. + pub latency_ms: Option, + /// Probe timestamp. + pub checked_at_ms: i64, + /// Sanitized error summary. + pub error: Option, +} + /// New direct Anthropic channel after admin request normalization. #[derive(Debug, Clone, PartialEq, Eq)] pub struct NewAdminAnthropicUpstreamChannel { diff --git a/crates/llm-access-core/src/store/mod.rs b/crates/llm-access-core/src/store/mod.rs index 044173f8..190e781a 100644 --- a/crates/llm-access-core/src/store/mod.rs +++ b/crates/llm-access-core/src/store/mod.rs @@ -24,12 +24,14 @@ pub use anthropic_upstream::{ canonical_anthropic_upstream_pool_mode, default_anthropic_upstream_pool_mode, normalize_anthropic_upstream_pool_mode, AdminAnthropicUpstreamChannel, AdminAnthropicUpstreamChannelPatch, AdminAnthropicUpstreamChannelsPage, - AdminAnthropicUpstreamUsageRollup, AnthropicUpstreamChannelUsageDelta, - NewAdminAnthropicUpstreamChannel, ProviderAnthropicUpstreamResolution, - ProviderAnthropicUpstreamRoute, ANTHROPIC_UPSTREAM_POOL_MODE_DISABLED, - ANTHROPIC_UPSTREAM_POOL_MODE_ONLY, ANTHROPIC_UPSTREAM_POOL_MODE_PREFERRED_BEFORE_KIRO, - DEFAULT_ANTHROPIC_UPSTREAM_BASE_URL, DEFAULT_ANTHROPIC_UPSTREAM_MAX_CONCURRENCY, - DEFAULT_ANTHROPIC_UPSTREAM_MIN_START_INTERVAL_MS, DEFAULT_ANTHROPIC_UPSTREAM_WEIGHT, + AdminAnthropicUpstreamModelsStatusUpdate, AdminAnthropicUpstreamProbeTarget, + AdminAnthropicUpstreamTestStatusUpdate, AdminAnthropicUpstreamUsageRollup, + AnthropicUpstreamChannelUsageDelta, NewAdminAnthropicUpstreamChannel, + ProviderAnthropicUpstreamResolution, ProviderAnthropicUpstreamRoute, + ANTHROPIC_UPSTREAM_POOL_MODE_DISABLED, ANTHROPIC_UPSTREAM_POOL_MODE_ONLY, + ANTHROPIC_UPSTREAM_POOL_MODE_PREFERRED_BEFORE_KIRO, DEFAULT_ANTHROPIC_UPSTREAM_BASE_URL, + DEFAULT_ANTHROPIC_UPSTREAM_MAX_CONCURRENCY, DEFAULT_ANTHROPIC_UPSTREAM_MIN_START_INTERVAL_MS, + DEFAULT_ANTHROPIC_UPSTREAM_WEIGHT, }; pub use codex_account::{ AdminAccountsSummary, AdminCodexAccount, AdminCodexAccountPageQuery, AdminCodexAccountPatch, diff --git a/crates/llm-access-core/src/store/traits.rs b/crates/llm-access-core/src/store/traits.rs index 169c3922..0e7988ff 100644 --- a/crates/llm-access-core/src/store/traits.rs +++ b/crates/llm-access-core/src/store/traits.rs @@ -7,9 +7,10 @@ use async_trait::async_trait; use super::{ anthropic_upstream::{ AdminAnthropicUpstreamChannel, AdminAnthropicUpstreamChannelPatch, - AdminAnthropicUpstreamChannelsPage, AnthropicUpstreamChannelUsageDelta, - NewAdminAnthropicUpstreamChannel, ProviderAnthropicUpstreamResolution, - ProviderAnthropicUpstreamRoute, + AdminAnthropicUpstreamChannelsPage, AdminAnthropicUpstreamModelsStatusUpdate, + AdminAnthropicUpstreamProbeTarget, AdminAnthropicUpstreamTestStatusUpdate, + AnthropicUpstreamChannelUsageDelta, NewAdminAnthropicUpstreamChannel, + ProviderAnthropicUpstreamResolution, ProviderAnthropicUpstreamRoute, }, codex_account::{ apply_admin_codex_account_query, summarize_admin_accounts, AdminCodexAccount, @@ -75,22 +76,18 @@ pub trait ControlStore: Send + Sync { /// Record Codex image usage observed outside the normal usage journal. async fn record_codex_image_key_usage( &self, - _key_id: &str, - _usage_tokens: Option, - _used_at_ms: i64, - ) -> anyhow::Result<()> { - Ok(()) - } + key_id: &str, + usage_tokens: Option, + used_at_ms: i64, + ) -> anyhow::Result<()>; /// Increment direct Anthropic upstream channel counters observed on the /// hot path. async fn record_anthropic_upstream_channel_usage( &self, - _channel_name: &str, - _delta: AnthropicUpstreamChannelUsageDelta, - ) -> anyhow::Result<()> { - Ok(()) - } + channel_name: &str, + delta: AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()>; } /// Provider route/account resolution used by data-plane dispatch. @@ -252,6 +249,33 @@ pub trait AdminAnthropicUpstreamStore: Send + Sync { &self, name: &str, ) -> anyhow::Result>; + + /// Load a direct Anthropic channel for an admin probe, including secret + /// material. This must never be returned directly to frontend callers. + async fn load_admin_anthropic_upstream_probe_target( + &self, + _name: &str, + ) -> anyhow::Result> { + Ok(None) + } + + /// Persist latest `/models` refresh state. + async fn save_admin_anthropic_upstream_models_status( + &self, + _name: &str, + _update: AdminAnthropicUpstreamModelsStatusUpdate, + ) -> anyhow::Result> { + Ok(None) + } + + /// Persist latest `/messages` test state. + async fn save_admin_anthropic_upstream_test_status( + &self, + _name: &str, + _update: AdminAnthropicUpstreamTestStatusUpdate, + ) -> anyhow::Result> { + Ok(None) + } } /// Public read-only queries used by unauthenticated public endpoints. diff --git a/crates/llm-access-migrations/migrations/postgres/0034_anthropic_upstream_probe_state.sql b/crates/llm-access-migrations/migrations/postgres/0034_anthropic_upstream_probe_state.sql new file mode 100644 index 00000000..2e195e21 --- /dev/null +++ b/crates/llm-access-migrations/migrations/postgres/0034_anthropic_upstream_probe_state.sql @@ -0,0 +1,37 @@ +ALTER TABLE IF EXISTS llm_anthropic_upstream_channels + ADD COLUMN IF NOT EXISTS model_ids JSONB NOT NULL DEFAULT '[]'::jsonb, + ADD COLUMN IF NOT EXISTS last_models_status TEXT, + ADD COLUMN IF NOT EXISTS last_models_latency_ms BIGINT CHECK ( + last_models_latency_ms IS NULL OR last_models_latency_ms >= 0 + ), + ADD COLUMN IF NOT EXISTS last_models_checked_at_ms BIGINT CHECK ( + last_models_checked_at_ms IS NULL OR last_models_checked_at_ms >= 0 + ), + ADD COLUMN IF NOT EXISTS last_models_error TEXT, + ADD COLUMN IF NOT EXISTS last_test_model TEXT, + ADD COLUMN IF NOT EXISTS last_test_status TEXT, + ADD COLUMN IF NOT EXISTS last_test_latency_ms BIGINT CHECK ( + last_test_latency_ms IS NULL OR last_test_latency_ms >= 0 + ), + ADD COLUMN IF NOT EXISTS last_test_at_ms BIGINT CHECK ( + last_test_at_ms IS NULL OR last_test_at_ms >= 0 + ), + ADD COLUMN IF NOT EXISTS last_test_error TEXT; + +UPDATE llm_anthropic_upstream_channels +SET model_ids = '[]'::jsonb +WHERE model_ids IS NULL + OR jsonb_typeof(model_ids) <> 'array'; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'ck_llm_anthropic_upstream_channels_model_ids_array' + ) THEN + ALTER TABLE llm_anthropic_upstream_channels + ADD CONSTRAINT ck_llm_anthropic_upstream_channels_model_ids_array + CHECK (jsonb_typeof(model_ids) = 'array'); + END IF; +END $$; diff --git a/crates/llm-access-migrations/src/lib.rs b/crates/llm-access-migrations/src/lib.rs index 75ffa5f6..09e4d28b 100644 --- a/crates/llm-access-migrations/src/lib.rs +++ b/crates/llm-access-migrations/src/lib.rs @@ -173,6 +173,11 @@ const POSTGRES_MIGRATIONS: &[SqlMigration] = &[ name: "anthropic_upstream_pool", sql: include_str!("../migrations/postgres/0033_anthropic_upstream_pool.sql"), }, + SqlMigration { + version: 34, + name: "anthropic_upstream_probe_state", + sql: include_str!("../migrations/postgres/0034_anthropic_upstream_probe_state.sql"), + }, ]; /// Return target DuckDB migrations in execution order. @@ -591,4 +596,18 @@ mod tests { .contains("llm_anthropic_upstream_channel_usage_rollups")); assert!(migration.sql.contains("DEFAULT 'disabled'")); } + + #[test] + fn postgres_migrations_include_anthropic_upstream_probe_state() { + let migrations = super::postgres_migrations(); + let migration = migrations + .iter() + .find(|migration| migration.name == "anthropic_upstream_probe_state") + .expect("anthropic upstream probe state migration exists"); + + assert_eq!(migration.version, 34); + assert!(migration.sql.contains("model_ids JSONB")); + assert!(migration.sql.contains("last_models_checked_at_ms")); + assert!(migration.sql.contains("last_test_model")); + } } diff --git a/crates/llm-access-store/src/duckdb/sql.rs b/crates/llm-access-store/src/duckdb/sql.rs index cb0bb11c..2b9ae7f3 100644 --- a/crates/llm-access-store/src/duckdb/sql.rs +++ b/crates/llm-access-store/src/duckdb/sql.rs @@ -406,7 +406,7 @@ fn usage_event_base_select_exprs( "CAST(NULL AS VARCHAR)", ) } else { - "CAST(NULL AS VARCHAR) AS routing_diagnostics_json".to_string() + usage_event_column_expr(columns, "routing_diagnostics_json", "CAST(NULL AS VARCHAR)") }, usage_event_required_expr("input_uncached_tokens"), usage_event_required_expr("input_cached_tokens"), diff --git a/crates/llm-access-store/src/duckdb/tests.rs b/crates/llm-access-store/src/duckdb/tests.rs index 3ebf2c70..e0b21d34 100644 --- a/crates/llm-access-store/src/duckdb/tests.rs +++ b/crates/llm-access-store/src/duckdb/tests.rs @@ -96,7 +96,6 @@ fn assert_usage_event_round_trips(actual: &UsageEvent, expected: &UsageEvent) { fn assert_usage_event_summary_round_trips(actual: &UsageEvent, expected: &UsageEvent) { let mut expected_summary = expected.clone(); expected_summary.request_headers_json = "{}".to_string(); - expected_summary.routing_diagnostics_json = None; expected_summary.last_message_content = None; expected_summary.client_request_body_json = None; expected_summary.upstream_request_body_json = None; @@ -340,7 +339,10 @@ async fn duckdb_repository_persists_usage_events_with_default_feature() { assert_eq!(page.events.len(), 1); assert_usage_event_summary_round_trips(&page.events[0], &event); assert_eq!(page.events[0].request_headers_json, "{}"); - assert_eq!(page.events[0].routing_diagnostics_json, None); + assert_eq!( + page.events[0].routing_diagnostics_json, + Some(r#"{"route":"diagnostic"}"#.to_string()) + ); assert_eq!(page.events[0].last_message_content, None); assert_eq!(page.events[0].client_request_body_json, None); assert_eq!(page.events[0].upstream_request_body_json, None); diff --git a/crates/llm-access-store/src/postgres.rs b/crates/llm-access-store/src/postgres.rs index e30757c4..aaacbc7f 100644 --- a/crates/llm-access-store/src/postgres.rs +++ b/crates/llm-access-store/src/postgres.rs @@ -143,6 +143,16 @@ pub(crate) struct AnthropicUpstreamChannelRow { proxy_mode: String, proxy_config_id: Option, last_error: Option, + model_ids: Vec, + last_models_status: Option, + last_models_latency_ms: Option, + last_models_checked_at_ms: Option, + last_models_error: Option, + last_test_model: Option, + last_test_status: Option, + last_test_latency_ms: Option, + last_test_at_ms: Option, + last_test_error: Option, created_at_ms: i64, updated_at_ms: i64, input_uncached_tokens: i64, @@ -771,13 +781,15 @@ mod tests { use llm_access_core::{ provider::{ProtocolFamily, ProviderType, RouteStrategy}, store::{ - AdminAnthropicUpstreamStore, AdminCodexAccountPageQuery, AdminCodexAccountSortMode, - AdminCodexAccountStore, AdminConfigStore, AdminKeyStore, AdminKiroAccountStore, - AdminPageRequest, AdminProxyConfigPatch, AdminProxyStore, AdminProxyTrafficSnapshot, - AdminReviewQueueStore, AnthropicUpstreamChannelUsageDelta, ControlStore, - KeyUsageRollupDelta, NewAdminAnthropicUpstreamChannel, NewAdminProxyConfig, - NewPublicAccountContributionRequest, ProxyTrafficTotals, PublicSubmissionStore, - PublicUsageStore, UsageEventSink, UsageRollupBatch, UsageRollupBatchSink, + AdminAnthropicUpstreamModelsStatusUpdate, AdminAnthropicUpstreamStore, + AdminAnthropicUpstreamTestStatusUpdate, AdminCodexAccountPageQuery, + AdminCodexAccountSortMode, AdminCodexAccountStore, AdminConfigStore, AdminKeyStore, + AdminKiroAccountStore, AdminPageRequest, AdminProxyConfigPatch, AdminProxyStore, + AdminProxyTrafficSnapshot, AdminReviewQueueStore, AnthropicUpstreamChannelUsageDelta, + ControlStore, KeyUsageRollupDelta, NewAdminAnthropicUpstreamChannel, + NewAdminProxyConfig, NewPublicAccountContributionRequest, ProxyTrafficTotals, + PublicSubmissionStore, PublicUsageStore, UsageEventSink, UsageRollupBatch, + UsageRollupBatchSink, }, }; use serde::Serialize; @@ -1170,6 +1182,61 @@ mod tests { .expect("create anthropic upstream channel"); assert!(created.has_api_key); assert_eq!(created.weight, 7); + assert!(created.models.is_empty()); + assert_eq!(created.last_models_status, None); + assert_eq!(created.last_test_status, None); + + let probe_target = repo + .load_admin_anthropic_upstream_probe_target("anthropic-a") + .await + .expect("load probe target") + .expect("probe target exists"); + assert_eq!(probe_target.name, "anthropic-a"); + assert_eq!(probe_target.base_url, "https://api.anthropic.com/v1"); + assert_eq!(probe_target.api_key, "sk-ant-test"); + assert_eq!(probe_target.proxy, None); + assert_eq!(probe_target.proxy_error, None); + assert_eq!(probe_target.last_test_at, None); + + let updated = repo + .save_admin_anthropic_upstream_models_status( + "anthropic-a", + AdminAnthropicUpstreamModelsStatusUpdate { + model_ids: vec!["claude-sonnet-4-6".to_string()], + status: "ok".to_string(), + latency_ms: Some(25), + checked_at_ms: 1_700_000_000_030, + error: None, + }, + ) + .await + .expect("save models status") + .expect("updated channel exists"); + assert_eq!(updated.models, vec!["claude-sonnet-4-6"]); + assert_eq!(updated.last_models_status.as_deref(), Some("ok")); + assert_eq!(updated.last_models_latency_ms, Some(25)); + assert_eq!(updated.last_models_checked_at, Some(1_700_000_000_030)); + assert_eq!(updated.last_models_error, None); + + let updated = repo + .save_admin_anthropic_upstream_test_status( + "anthropic-a", + AdminAnthropicUpstreamTestStatusUpdate { + model: "claude-sonnet-4-6".to_string(), + status: "http_401".to_string(), + latency_ms: Some(31), + checked_at_ms: 1_700_000_000_040, + error: Some("upstream returned HTTP 401".to_string()), + }, + ) + .await + .expect("save test status") + .expect("updated channel exists"); + assert_eq!(updated.last_test_model.as_deref(), Some("claude-sonnet-4-6")); + assert_eq!(updated.last_test_status.as_deref(), Some("http_401")); + assert_eq!(updated.last_test_latency_ms, Some(31)); + assert_eq!(updated.last_test_at, Some(1_700_000_000_040)); + assert_eq!(updated.last_test_error.as_deref(), Some("upstream returned HTTP 401")); repo.record_anthropic_upstream_channel_usage( "anthropic-a", @@ -1214,6 +1281,18 @@ mod tests { assert_eq!(channel.usage.billable_tokens, 44); assert_eq!(channel.usage.usage_missing_events, 1); assert_eq!(channel.usage.last_used_at, Some(1_700_000_000_020)); + assert_eq!(channel.models, vec!["claude-sonnet-4-6"]); + assert_eq!(channel.last_test_status.as_deref(), Some("http_401")); + + let probe_target = repo + .load_admin_anthropic_upstream_probe_target("anthropic-a") + .await + .expect("reload probe target") + .expect("probe target exists"); + assert_eq!(probe_target.api_key, "sk-ant-test"); + assert_eq!(probe_target.proxy, None); + assert_eq!(probe_target.proxy_error, None); + assert_eq!(probe_target.last_test_at, Some(1_700_000_000_040)); } #[tokio::test] diff --git a/crates/llm-access-store/src/postgres/anthropic_upstream.rs b/crates/llm-access-store/src/postgres/anthropic_upstream.rs index be6d37e3..59642ea1 100644 --- a/crates/llm-access-store/src/postgres/anthropic_upstream.rs +++ b/crates/llm-access-store/src/postgres/anthropic_upstream.rs @@ -4,13 +4,17 @@ use anyhow::Context; use async_trait::async_trait; use llm_access_core::store::{ self as core_store, AdminAnthropicUpstreamChannel, AdminAnthropicUpstreamChannelPatch, - AdminAnthropicUpstreamChannelsPage, AdminAnthropicUpstreamStore, - AdminAnthropicUpstreamUsageRollup, AdminPageRequest, AnthropicUpstreamChannelUsageDelta, - NewAdminAnthropicUpstreamChannel, + AdminAnthropicUpstreamChannelsPage, AdminAnthropicUpstreamModelsStatusUpdate, + AdminAnthropicUpstreamProbeTarget, AdminAnthropicUpstreamStore, + AdminAnthropicUpstreamTestStatusUpdate, AdminAnthropicUpstreamUsageRollup, AdminPageRequest, + AnthropicUpstreamChannelUsageDelta, NewAdminAnthropicUpstreamChannel, }; use serde::{Deserialize, Serialize}; -use super::{now_ms, AnthropicUpstreamChannelRow, PostgresControlRepository}; +use super::{ + now_ms, proxy_support::resolve_provider_proxy_config_from_context, AnthropicUpstreamChannelRow, + PostgresControlRepository, +}; #[derive(Debug, Clone, Serialize, Deserialize)] struct CachedAnthropicUpstreamChannelsLookup { @@ -22,6 +26,24 @@ fn non_negative_i64_to_u64(value: i64) -> u64 { value.max(0) as u64 } +fn optional_non_negative_i64_to_u64(value: Option) -> Option { + value.map(non_negative_i64_to_u64) +} + +fn model_ids_from_json_text(channel_name: &str, raw: &str) -> Vec { + match serde_json::from_str::>(raw) { + Ok(model_ids) => model_ids, + Err(err) => { + tracing::warn!( + channel = %channel_name, + error = %err, + "stored Anthropic upstream model_ids JSON is not a string array" + ); + Vec::new() + }, + } +} + fn auth_json_for_api_key(api_key: &str) -> anyhow::Result { serde_json::to_string(&serde_json::json!({ "api_key": api_key })) .context("serialize anthropic upstream auth json") @@ -42,6 +64,16 @@ fn admin_channel_from_row(row: AnthropicUpstreamChannelRow) -> AdminAnthropicUps proxy_mode: row.proxy_mode, proxy_config_id: row.proxy_config_id, last_error: row.last_error, + models: row.model_ids, + last_models_status: row.last_models_status, + last_models_latency_ms: optional_non_negative_i64_to_u64(row.last_models_latency_ms), + last_models_checked_at: row.last_models_checked_at_ms, + last_models_error: row.last_models_error, + last_test_model: row.last_test_model, + last_test_status: row.last_test_status, + last_test_latency_ms: optional_non_negative_i64_to_u64(row.last_test_latency_ms), + last_test_at: row.last_test_at_ms, + last_test_error: row.last_test_error, usage: AdminAnthropicUpstreamUsageRollup { input_uncached_tokens: non_negative_i64_to_u64(row.input_uncached_tokens), input_cached_tokens: non_negative_i64_to_u64(row.input_cached_tokens), @@ -68,14 +100,27 @@ impl PostgresControlRepository { proxy_mode: row.get(7), proxy_config_id: row.get(8), last_error: row.get(9), - created_at_ms: row.get(10), - updated_at_ms: row.get(11), - input_uncached_tokens: row.get(12), - input_cached_tokens: row.get(13), - output_tokens: row.get(14), - billable_tokens: row.get(15), - usage_missing_events: row.get(16), - last_used_at_ms: row.get(17), + model_ids: model_ids_from_json_text( + row.get::<_, String>(0).as_str(), + row.get::<_, String>(10).as_str(), + ), + last_models_status: row.get(11), + last_models_latency_ms: row.get(12), + last_models_checked_at_ms: row.get(13), + last_models_error: row.get(14), + last_test_model: row.get(15), + last_test_status: row.get(16), + last_test_latency_ms: row.get(17), + last_test_at_ms: row.get(18), + last_test_error: row.get(19), + created_at_ms: row.get(20), + updated_at_ms: row.get(21), + input_uncached_tokens: row.get(22), + input_cached_tokens: row.get(23), + output_tokens: row.get(24), + billable_tokens: row.get(25), + usage_missing_events: row.get(26), + last_used_at_ms: row.get(27), } } @@ -97,6 +142,16 @@ impl PostgresControlRepository { c.proxy_mode, c.proxy_config_id, c.last_error, + c.model_ids::text, + c.last_models_status, + c.last_models_latency_ms, + c.last_models_checked_at_ms, + c.last_models_error, + c.last_test_model, + c.last_test_status, + c.last_test_latency_ms, + c.last_test_at_ms, + c.last_test_error, c.created_at_ms, c.updated_at_ms, COALESCE(u.input_uncached_tokens, 0), @@ -184,6 +239,16 @@ impl PostgresControlRepository { c.proxy_mode, c.proxy_config_id, c.last_error, + c.model_ids::text, + c.last_models_status, + c.last_models_latency_ms, + c.last_models_checked_at_ms, + c.last_models_error, + c.last_test_model, + c.last_test_status, + c.last_test_latency_ms, + c.last_test_at_ms, + c.last_test_error, c.created_at_ms, c.updated_at_ms, COALESCE(u.input_uncached_tokens, 0), @@ -438,4 +503,137 @@ impl AdminAnthropicUpstreamStore for PostgresControlRepository { .await; Ok(Some(admin_channel_from_row(row))) } + + async fn load_admin_anthropic_upstream_probe_target( + &self, + name: &str, + ) -> anyhow::Result> { + let Some(row) = self.load_anthropic_upstream_channel_row(name).await? else { + return Ok(None); + }; + let Some(api_key) = row + .api_key + .clone() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + else { + anyhow::bail!("anthropic upstream channel api key is missing"); + }; + let proxy_context = self + .load_provider_proxy_resolution_context(core_store::PROVIDER_KIRO) + .await?; + let (proxy, proxy_error) = match resolve_provider_proxy_config_from_context( + &row.proxy_mode, + row.proxy_config_id.as_deref(), + &proxy_context, + ) { + Ok(proxy) => (proxy, None), + Err(err) => (None, Some(err.to_string())), + }; + Ok(Some(AdminAnthropicUpstreamProbeTarget { + name: row.channel_name, + base_url: row.base_url, + api_key, + proxy, + proxy_error, + last_test_at: row.last_test_at_ms, + })) + } + + async fn save_admin_anthropic_upstream_models_status( + &self, + name: &str, + update: AdminAnthropicUpstreamModelsStatusUpdate, + ) -> anyhow::Result> { + self.ensure_connection_alive()?; + let model_ids_json = serde_json::to_string(&update.model_ids) + .context("serialize anthropic upstream model ids")?; + let latency_ms = update + .latency_ms + .map(|value| value.min(i64::MAX as u64) as i64); + let updated_at_ms = now_ms().max(update.checked_at_ms); + let updated = self + .client + .execute( + "UPDATE llm_anthropic_upstream_channels + SET model_ids = $2::jsonb, + last_models_status = $3, + last_models_latency_ms = $4, + last_models_checked_at_ms = $5, + last_models_error = $6, + updated_at_ms = $7 + WHERE channel_name = $1", + &[ + &name, + &model_ids_json, + &update.status, + &latency_ms, + &update.checked_at_ms, + &update.error, + &updated_at_ms, + ], + ) + .await + .context("save postgres anthropic upstream models status")?; + if updated == 0 { + return Ok(None); + } + self.load_anthropic_upstream_channel_row(name) + .await + .map(|row| row.map(admin_channel_from_row)) + } + + async fn save_admin_anthropic_upstream_test_status( + &self, + name: &str, + update: AdminAnthropicUpstreamTestStatusUpdate, + ) -> anyhow::Result> { + self.ensure_connection_alive()?; + let latency_ms = update + .latency_ms + .map(|value| value.min(i64::MAX as u64) as i64); + let updated_at_ms = now_ms().max(update.checked_at_ms); + let updated = self + .client + .execute( + "UPDATE llm_anthropic_upstream_channels + SET last_test_model = $2, + last_test_status = $3, + last_test_latency_ms = $4, + last_test_at_ms = $5, + last_test_error = $6, + updated_at_ms = $7 + WHERE channel_name = $1", + &[ + &name, + &update.model, + &update.status, + &latency_ms, + &update.checked_at_ms, + &update.error, + &updated_at_ms, + ], + ) + .await + .context("save postgres anthropic upstream test status")?; + if updated == 0 { + return Ok(None); + } + self.load_anthropic_upstream_channel_row(name) + .await + .map(|row| row.map(admin_channel_from_row)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn model_ids_from_json_text_falls_back_for_malformed_model_array() { + assert_eq!(model_ids_from_json_text("anthropic-a", r#"["claude-haiku-4-5"]"#), vec![ + "claude-haiku-4-5".to_string() + ]); + assert!(model_ids_from_json_text("anthropic-a", r#"[{"id":"claude"}]"#).is_empty()); + } } diff --git a/crates/llm-access/src/admin.rs b/crates/llm-access/src/admin.rs index 03146526..1d96c9fc 100644 --- a/crates/llm-access/src/admin.rs +++ b/crates/llm-access/src/admin.rs @@ -17,6 +17,9 @@ use axum::{ response::{IntoResponse, Response}, Json, }; +use llm_access_anthropic_pool::is_private_or_loopback_ip; +#[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] +use llm_access_core::store::UsageEventSink; use llm_access_core::{ provider::{ProtocolFamily, ProviderType, RouteStrategy}, store::{ @@ -49,7 +52,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use crate::{ activity::RequestActivitySnapshot, - codex_refresh, codex_status, kiro_refresh, kiro_status, + anthropic_upstream_probe, codex_refresh, codex_status, kiro_refresh, kiro_status, process_memory::{read_current_process_memory_stats, ProcessMemoryStats}, provider, HttpState, }; @@ -73,6 +76,7 @@ const USAGE_WORKER_QUERY_TIMEOUT: Duration = Duration::from_secs(60); const PROXY_TRAFFIC_REFRESH_MAX_WINDOW_DAYS: u64 = 30; const PROXY_TRAFFIC_REFRESH_BUCKET_MS: i64 = 24 * 60 * 60 * 1000; const HOUR_MS: i64 = 60 * 60 * 1000; +const ADMIN_ANTHROPIC_UPSTREAM_TEST_COOLDOWN_MS: i64 = 30_000; const MIN_RUNTIME_USAGE_EVENT_FLUSH_BATCH_SIZE: u64 = 1; const MAX_RUNTIME_USAGE_EVENT_FLUSH_BATCH_SIZE: u64 = 16_384; const MIN_RUNTIME_USAGE_EVENT_FLUSH_INTERVAL_SECONDS: u64 = 1; @@ -245,6 +249,17 @@ struct AdminAnthropicUpstreamChannelsResponse { generated_at: i64, } +#[derive(Debug, Serialize)] +struct AdminAnthropicUpstreamProbeResponse { + ok: bool, + status: String, + status_code: Option, + latency_ms: u64, + error: Option, + channel: core_store::AdminAnthropicUpstreamChannel, + generated_at: i64, +} + #[derive(Debug, Serialize)] struct AdminKiroAccountStatusesResponse { accounts: Vec, @@ -649,6 +664,11 @@ pub(crate) struct PatchAdminAnthropicUpstreamChannelRequest { clear_last_error: bool, } +#[derive(Debug, Deserialize)] +pub(crate) struct TestAdminAnthropicUpstreamChannelRequest { + model: String, +} + #[derive(Debug, Deserialize)] pub(crate) struct CreateLlmGatewayAccountGroupRequest { name: String, @@ -2901,6 +2921,152 @@ pub(crate) async fn delete_admin_anthropic_upstream_channel( } } +pub(crate) async fn refresh_admin_anthropic_upstream_models( + State(state): State, + headers: HeaderMap, + Path(name): Path, +) -> Response { + if let Err(response) = ensure_admin_access(&headers) { + return response.into_response(); + } + let name = match normalize_name(&name) { + Ok(name) => name, + Err(response) => return response.into_response(), + }; + let target = match state + .admin_anthropic_upstream_store + .load_admin_anthropic_upstream_probe_target(&name) + .await + { + Ok(Some(target)) => target, + Ok(None) => return not_found("Anthropic upstream channel not found").into_response(), + Err(err) => { + tracing::warn!( + channel = %name, + error = %err, + "failed to load Anthropic upstream probe target" + ); + return internal_error("Failed to load Anthropic upstream channel").into_response(); + }, + }; + let output = anthropic_upstream_probe::refresh_models(&target).await; + let update = core_store::AdminAnthropicUpstreamModelsStatusUpdate { + model_ids: output.model_ids.clone(), + status: output.status.clone(), + latency_ms: Some(output.latency_ms), + checked_at_ms: output.checked_at_ms, + error: output.error.clone(), + }; + match state + .admin_anthropic_upstream_store + .save_admin_anthropic_upstream_models_status(&name, update) + .await + { + Ok(Some(channel)) => Json(AdminAnthropicUpstreamProbeResponse { + ok: output.status == "ok" && output.error.is_none(), + status: output.status, + status_code: output.status_code, + latency_ms: output.latency_ms, + error: output.error, + channel, + generated_at: now_ms(), + }) + .into_response(), + Ok(None) => not_found("Anthropic upstream channel not found").into_response(), + Err(err) => { + tracing::warn!( + channel = %name, + error = %err, + "failed to save Anthropic upstream models status" + ); + internal_error("Failed to save Anthropic upstream models status").into_response() + }, + } +} + +pub(crate) async fn test_admin_anthropic_upstream_model( + State(state): State, + headers: HeaderMap, + Path(name): Path, + Json(request): Json, +) -> Response { + if let Err(response) = ensure_admin_access(&headers) { + return response.into_response(); + } + let name = match normalize_name(&name) { + Ok(name) => name, + Err(response) => return response.into_response(), + }; + let model = match normalize_anthropic_upstream_test_request(request) { + Ok(model) => model, + Err(response) => return response.into_response(), + }; + let target = match state + .admin_anthropic_upstream_store + .load_admin_anthropic_upstream_probe_target(&name) + .await + { + Ok(Some(target)) => target, + Ok(None) => return not_found("Anthropic upstream channel not found").into_response(), + Err(err) => { + tracing::warn!( + channel = %name, + error = %err, + "failed to load Anthropic upstream probe target" + ); + return internal_error("Failed to load Anthropic upstream channel").into_response(); + }, + }; + if let Err(response) = enforce_anthropic_upstream_test_cooldown(target.last_test_at, now_ms()) { + return response.into_response(); + } + let output = anthropic_upstream_probe::test_messages_model(&target, &model).await; + let usage_event = + anthropic_upstream_probe::usage_event_for_messages_test(&name, &model, &output); + let usage_event_id = usage_event.event_id.clone(); + append_admin_anthropic_probe_usage_event(&state, usage_event).await; + let update = core_store::AdminAnthropicUpstreamTestStatusUpdate { + model: model.clone(), + status: output.status.clone(), + latency_ms: Some(output.latency_ms), + checked_at_ms: output.checked_at_ms, + error: output.error.clone(), + }; + match state + .admin_anthropic_upstream_store + .save_admin_anthropic_upstream_test_status(&name, update) + .await + { + Ok(Some(channel)) => Json(AdminAnthropicUpstreamProbeResponse { + ok: output.status == "ok" && output.error.is_none(), + status: output.status, + status_code: output.status_code, + latency_ms: output.latency_ms, + error: output.error, + channel, + generated_at: now_ms(), + }) + .into_response(), + Ok(None) => { + tracing::warn!( + channel = %name, + model = %model, + event_id = %usage_event_id, + "admin Anthropic upstream model test completed but channel was deleted before status save" + ); + not_found("Anthropic upstream channel not found").into_response() + }, + Err(err) => { + tracing::warn!( + channel = %name, + error = %err, + "failed to save Anthropic upstream test status" + ); + internal_error("Failed to save Anthropic upstream test status").into_response() + }, + } +} + pub(crate) fn kiro_cache_simulation_config_from_admin_config( config: &AdminRuntimeConfig, ) -> KiroCacheSimulationConfig { @@ -4655,6 +4821,33 @@ async fn acquire_admin_usage_query_permit_from_gate( .map_err(|_| internal_error("Admin usage query gate is closed")) } +async fn append_admin_anthropic_probe_usage_event( + state: &HttpState, + event: llm_access_core::usage::UsageEvent, +) { + #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] + { + if let Some(sink) = &state.usage_journal_sink { + if let Err(err) = sink.append_usage_event(&event).await { + tracing::warn!( + event_id = %event.event_id, + error = %err, + "failed to write admin Anthropic upstream test usage event" + ); + } + return; + } + tracing::warn!( + event_id = %event.event_id, + "usage journal sink is not configured; admin Anthropic upstream test usage event was not recorded" + ); + } + #[cfg(not(any(feature = "duckdb-runtime", feature = "duckdb-bundled")))] + { + let _ = (state, event); + } +} + fn producer_journal_status(state: &HttpState) -> anyhow::Result { #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] if let Some(sink) = &state.usage_journal_sink { @@ -6628,6 +6821,35 @@ fn normalize_anthropic_upstream_channel_patch( }) } +fn normalize_anthropic_upstream_test_request( + request: TestAdminAnthropicUpstreamChannelRequest, +) -> Result { + let model = request.model.trim().to_string(); + if model.is_empty() { + return Err(bad_request("model is required")); + } + Ok(model) +} + +fn enforce_anthropic_upstream_test_cooldown( + last_test_at_ms: Option, + now_ms: i64, +) -> Result<(), AdminHttpError> { + let Some(last_test_at_ms) = last_test_at_ms else { + return Ok(()); + }; + let remaining_ms = last_test_at_ms + .saturating_add(ADMIN_ANTHROPIC_UPSTREAM_TEST_COOLDOWN_MS) + .saturating_sub(now_ms); + if remaining_ms <= 0 { + return Ok(()); + } + let retry_after_seconds = ((remaining_ms as u64).saturating_add(999)) / 1_000; + Err(too_many_requests(&format!( + "Anthropic upstream model test is cooling down; retry after {retry_after_seconds}s" + ))) +} + fn normalize_route_strategy_input(raw: &str) -> Result, AdminHttpError> { let trimmed = raw.trim(); if trimmed.is_empty() { @@ -7545,18 +7767,6 @@ fn normalize_ip_token(token: &str) -> Option { None } -fn is_private_or_loopback_ip(ip: IpAddr) -> bool { - match ip { - IpAddr::V4(v4) => { - v4.is_loopback() - || v4.is_private() - || v4.is_link_local() - || v4.octets()[0] == 169 && v4.octets()[1] == 254 - }, - IpAddr::V6(v6) => v6.is_loopback() || v6.is_unique_local() || v6.is_unicast_link_local(), - } -} - fn is_local_host_header(headers: &HeaderMap) -> bool { let Some(raw_host) = headers .get(header::HOST) @@ -8937,6 +9147,48 @@ mod tests { assert_eq!(error.status, StatusCode::BAD_REQUEST); } + #[test] + fn normalize_anthropic_upstream_test_request_trims_model() { + let request = TestAdminAnthropicUpstreamChannelRequest { + model: " claude-sonnet-4-6 ".to_string(), + }; + + let model = normalize_anthropic_upstream_test_request(request).expect("model"); + + assert_eq!(model, "claude-sonnet-4-6"); + } + + #[test] + fn normalize_anthropic_upstream_test_request_rejects_empty_model() { + let error = + normalize_anthropic_upstream_test_request(TestAdminAnthropicUpstreamChannelRequest { + model: " ".to_string(), + }) + .expect_err("empty model should fail"); + + assert_eq!(error.status, StatusCode::BAD_REQUEST); + } + + #[test] + fn enforce_anthropic_upstream_test_cooldown_rejects_recent_probe() { + let error = + enforce_anthropic_upstream_test_cooldown(Some(1_700_000_000_000), 1_700_000_010_000) + .expect_err("recent probe should be rate limited"); + + assert_eq!(error.status, StatusCode::TOO_MANY_REQUESTS); + } + + #[test] + fn enforce_anthropic_upstream_test_cooldown_allows_expired_or_missing_probe() { + enforce_anthropic_upstream_test_cooldown(None, 1_700_000_000_000) + .expect("missing last test timestamp should pass"); + enforce_anthropic_upstream_test_cooldown( + Some(1_700_000_000_000), + 1_700_000_000_000 + ADMIN_ANTHROPIC_UPSTREAM_TEST_COOLDOWN_MS, + ) + .expect("expired cooldown should pass"); + } + #[test] fn normalize_kiro_account_patch_accepts_pool_strategy() { let patch = normalize_kiro_account_patch(PatchKiroAccountRequest { diff --git a/crates/llm-access/src/anthropic_upstream_probe.rs b/crates/llm-access/src/anthropic_upstream_probe.rs new file mode 100644 index 00000000..96e32cf0 --- /dev/null +++ b/crates/llm-access/src/anthropic_upstream_probe.rs @@ -0,0 +1,577 @@ +//! Admin probes for direct Anthropic upstream channels. + +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use axum::body::Bytes; +use futures_util::StreamExt; +use llm_access_anthropic_pool::{ + apply_anthropic_auth_headers, build_messages_url, build_models_url, + parse_model_ids_from_models_response, parse_usage_from_value, AnthropicUsageSummary, + ANTHROPIC_VERSION_2023_06_01, +}; +use llm_access_core::{ + provider::{ProtocolFamily, ProviderType}, + store::{self as core_store, AdminAnthropicUpstreamProbeTarget}, + usage::{UsageEvent, UsageTiming}, +}; +use reqwest::StatusCode; + +use crate::provider; + +const PROBE_TIMEOUT: Duration = Duration::from_secs(30); +const MAX_PROBE_RESPONSE_BYTES: usize = 1024 * 1024; +const ADMIN_TEST_KEY_ID: &str = "admin-direct-anthropic-test"; +const ADMIN_TEST_KEY_NAME: &str = "Admin direct Anthropic test"; + +#[derive(Debug, Clone)] +pub(crate) struct ModelsProbeOutput { + pub model_ids: Vec, + pub status: String, + pub status_code: Option, + pub latency_ms: u64, + pub checked_at_ms: i64, + pub error: Option, +} + +impl ModelsProbeOutput { + fn ok(model_ids: Vec, started: Instant, checked_at_ms: i64, status_code: u16) -> Self { + Self { + model_ids, + status: "ok".to_string(), + status_code: Some(status_code), + latency_ms: elapsed_ms(started), + checked_at_ms, + error: None, + } + } + + fn failure( + started: Instant, + checked_at_ms: i64, + status: impl Into, + status_code: Option, + error: impl Into, + ) -> Self { + Self { + model_ids: Vec::new(), + status: status.into(), + status_code, + latency_ms: elapsed_ms(started), + checked_at_ms, + error: Some(error.into()), + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct MessagesProbeOutput { + pub status: String, + pub status_code: Option, + pub latency_ms: u64, + pub checked_at_ms: i64, + pub error: Option, + pub error_class: Option, + pub usage: AnthropicUsageSummary, + pub upstream_request_body_json: String, +} + +impl MessagesProbeOutput { + fn ok( + started: Instant, + checked_at_ms: i64, + status_code: u16, + usage: AnthropicUsageSummary, + upstream_request_body_json: String, + ) -> Self { + Self { + status: "ok".to_string(), + status_code: Some(status_code), + latency_ms: elapsed_ms(started), + checked_at_ms, + error: None, + error_class: None, + usage, + upstream_request_body_json, + } + } + + fn failure( + started: Instant, + checked_at_ms: i64, + status: impl Into, + status_code: Option, + error: impl Into, + error_class: impl Into, + upstream_request_body_json: String, + ) -> Self { + Self { + status: status.into(), + status_code, + latency_ms: elapsed_ms(started), + checked_at_ms, + error: Some(error.into()), + error_class: Some(error_class.into()), + usage: AnthropicUsageSummary::missing(), + upstream_request_body_json, + } + } +} + +pub(crate) async fn refresh_models( + target: &AdminAnthropicUpstreamProbeTarget, +) -> ModelsProbeOutput { + let checked_at_ms = now_ms(); + let started = Instant::now(); + if let Some(error) = target.proxy_error.as_deref() { + return ModelsProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(error), + ); + } + let url = match build_models_url(&target.base_url) { + Ok(url) => url, + Err(err) => { + return ModelsProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(&err.to_string()), + ); + }, + }; + let client = match provider::anthropic_upstream_client(target.proxy.as_ref()) { + Ok(client) => client, + Err(err) => { + return ModelsProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(&err.to_string()), + ); + }, + }; + let request = apply_anthropic_auth_headers( + client.get(url), + &target.api_key, + ANTHROPIC_VERSION_2023_06_01, + ) + .header(reqwest::header::ACCEPT, "application/json") + .timeout(PROBE_TIMEOUT); + let response = match request.send().await { + Ok(response) => response, + Err(err) => { + return ModelsProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(&err.to_string()), + ); + }, + }; + let status = response.status(); + let status_code = status.as_u16(); + let body = match read_limited_response_body(response).await { + Ok(body) => body, + Err(err) => { + return ModelsProbeOutput::failure( + started, + checked_at_ms, + "error", + Some(status_code), + err, + ); + }, + }; + if !status.is_success() { + return ModelsProbeOutput::failure( + started, + checked_at_ms, + http_status_label(status), + Some(status_code), + upstream_error_summary(status, &body), + ); + } + match parse_model_ids_from_models_response(&body) { + Ok(model_ids) => ModelsProbeOutput::ok(model_ids, started, checked_at_ms, status_code), + Err(err) => ModelsProbeOutput::failure( + started, + checked_at_ms, + "error", + Some(status_code), + sanitize_error(&format!("failed to parse models response: {err}")), + ), + } +} + +pub(crate) async fn test_messages_model( + target: &AdminAnthropicUpstreamProbeTarget, + model: &str, +) -> MessagesProbeOutput { + let checked_at_ms = now_ms(); + let started = Instant::now(); + let payload = serde_json::json!({ + "model": model, + "max_tokens": 8, + "messages": [ + { "role": "user", "content": "hi" } + ] + }); + let upstream_request_body_json = payload.to_string(); + if let Some(error) = target.proxy_error.as_deref() { + return MessagesProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(error), + "probe_proxy_error", + upstream_request_body_json, + ); + } + let url = match build_messages_url(&target.base_url) { + Ok(url) => url, + Err(err) => { + return MessagesProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(&err.to_string()), + "probe_config_error", + upstream_request_body_json, + ); + }, + }; + let client = match provider::anthropic_upstream_client(target.proxy.as_ref()) { + Ok(client) => client, + Err(err) => { + return MessagesProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(&err.to_string()), + "probe_client_error", + upstream_request_body_json, + ); + }, + }; + let request = apply_anthropic_auth_headers( + client.post(url), + &target.api_key, + ANTHROPIC_VERSION_2023_06_01, + ) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header(reqwest::header::ACCEPT, "application/json") + .timeout(PROBE_TIMEOUT) + .body(upstream_request_body_json.clone()); + let response = match request.send().await { + Ok(response) => response, + Err(err) => { + return MessagesProbeOutput::failure( + started, + checked_at_ms, + "error", + None, + sanitize_error(&err.to_string()), + "upstream_transport_error", + upstream_request_body_json, + ); + }, + }; + let status = response.status(); + let status_code = status.as_u16(); + let body = match read_limited_response_body(response).await { + Ok(body) => body, + Err(err) => { + return MessagesProbeOutput::failure( + started, + checked_at_ms, + "error", + Some(status_code), + err, + "upstream_body_error", + upstream_request_body_json, + ); + }, + }; + if !status.is_success() { + return MessagesProbeOutput::failure( + started, + checked_at_ms, + http_status_label(status), + Some(status_code), + upstream_error_summary(status, &body), + "upstream_error", + upstream_request_body_json, + ); + } + let usage = serde_json::from_slice::(&body) + .map(|value| parse_usage_from_value(&value)) + .unwrap_or_else(|_| AnthropicUsageSummary::missing()); + MessagesProbeOutput::ok(started, checked_at_ms, status_code, usage, upstream_request_body_json) +} + +pub(crate) fn usage_event_for_messages_test( + channel_name: &str, + model: &str, + output: &MessagesProbeOutput, +) -> UsageEvent { + UsageEvent { + event_id: format!("llm-usage-{}", uuid::Uuid::new_v4()), + created_at_ms: output.checked_at_ms, + provider_type: ProviderType::Kiro, + protocol_family: ProtocolFamily::Anthropic, + key_id: ADMIN_TEST_KEY_ID.to_string(), + key_name: ADMIN_TEST_KEY_NAME.to_string(), + account_name: Some(channel_name.to_string()), + account_group_id_at_event: None, + route_strategy_at_event: None, + request_method: "POST".to_string(), + request_url: format!("/admin/kiro-gateway/anthropic-upstreams/{channel_name}/test"), + endpoint: "/v1/messages".to_string(), + model: Some(model.to_string()), + mapped_model: None, + status_code: i64::from(output.status_code.unwrap_or(502)), + request_body_bytes: Some(output.upstream_request_body_json.len() as i64), + quota_failover_count: 0, + retry: Default::default(), + routing_diagnostics_json: Some( + serde_json::json!({ + "upstream_pool": "direct_anthropic_test", + "channel_name": channel_name, + "admin_probe": true, + "probe_kind": "messages_model_test", + "admin_probe_billable_tokens": admin_probe_billable_tokens(output.usage), + }) + .to_string(), + ), + input_uncached_tokens: output.usage.input_uncached_tokens.max(0), + input_cached_tokens: output.usage.input_cached_tokens.max(0), + output_tokens: output.usage.output_tokens.max(0), + billable_tokens: 0, + credit_usage: None, + usage_missing: output.usage.usage_missing, + credit_usage_missing: true, + client_ip: "admin".to_string(), + ip_region: "admin".to_string(), + request_headers_json: serde_json::json!({ + "anthropic-version": ANTHROPIC_VERSION_2023_06_01, + }) + .to_string(), + last_message_content: Some("hi".to_string()), + client_request_body_json: None, + upstream_request_body_json: Some(output.upstream_request_body_json.clone()), + full_request_json: None, + error_message: output.error.clone(), + error_class: output.error_class.clone(), + session_blocked: false, + response_image_count: None, + error_body: None, + response_body: None, + timing: UsageTiming { + latency_ms: Some(output.latency_ms.min(i64::MAX as u64) as i64), + upstream_headers_ms: Some(output.latency_ms.min(i64::MAX as u64) as i64), + ..UsageTiming::default() + }, + stream: Default::default(), + } +} + +fn admin_probe_billable_tokens(usage: AnthropicUsageSummary) -> u64 { + if usage.usage_missing { + return 0; + } + core_store::compute_billable_tokens( + usage.input_uncached_tokens.max(0) as u64, + usage.input_cached_tokens.max(0) as u64, + usage.output_tokens.max(0) as u64, + ) +} + +async fn read_limited_response_body(response: reqwest::Response) -> Result { + if response + .content_length() + .is_some_and(|len| len > MAX_PROBE_RESPONSE_BYTES as u64) + { + return Err("upstream probe response is too large".to_string()); + } + let mut body = Vec::new(); + let mut stream = response.bytes_stream(); + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result + .map_err(|err| sanitize_error(&format!("failed to read response: {err}")))?; + if body.len().saturating_add(chunk.len()) > MAX_PROBE_RESPONSE_BYTES { + return Err("upstream probe response is too large".to_string()); + } + body.extend_from_slice(&chunk); + } + Ok(Bytes::from(body)) +} + +fn upstream_error_summary(status: StatusCode, body: &Bytes) -> String { + let message = serde_json::from_slice::(body) + .ok() + .and_then(|value| { + value + .get("error") + .and_then(|error| error.get("message")) + .or_else(|| value.get("message")) + .and_then(serde_json::Value::as_str) + .map(str::to_string) + }) + .unwrap_or_else(|| String::from_utf8_lossy(body).trim().to_string()); + if message.is_empty() { + format!("upstream returned HTTP {}", status.as_u16()) + } else { + sanitize_error(&format!("upstream returned HTTP {}: {message}", status.as_u16())) + } +} + +fn sanitize_error(message: &str) -> String { + const MAX_ERROR_CHARS: usize = 500; + let mut sanitized = String::new(); + for part in message.split_whitespace() { + if !sanitized.is_empty() { + sanitized.push(' '); + } + sanitized.push_str(part); + } + let mut chars = sanitized.chars(); + let mut truncated = chars.by_ref().take(MAX_ERROR_CHARS).collect::(); + if chars.next().is_some() { + truncated.push_str("..."); + sanitized = truncated; + } + if sanitized.is_empty() { + "upstream probe failed".to_string() + } else { + sanitized + } +} + +fn http_status_label(status: StatusCode) -> String { + format!("http_{}", status.as_u16()) +} + +fn elapsed_ms(started: Instant) -> u64 { + started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64 +} + +fn now_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis().min(i64::MAX as u128) as i64) + .unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + }; + + use super::*; + + #[test] + fn messages_test_usage_event_keeps_quota_zero_but_exposes_probe_cost() { + let output = MessagesProbeOutput::ok( + Instant::now(), + 1_700_000_000_000, + 200, + AnthropicUsageSummary { + input_uncached_tokens: 100, + input_cached_tokens: 20, + output_tokens: 3, + usage_missing: false, + }, + "{}".to_string(), + ); + + let event = usage_event_for_messages_test("yl", "claude-haiku-4-5", &output); + let diagnostics: serde_json::Value = serde_json::from_str( + event + .routing_diagnostics_json + .as_deref() + .expect("diagnostics"), + ) + .expect("diagnostics json"); + + assert_eq!(event.billable_tokens, 0); + assert_eq!(diagnostics["upstream_pool"], "direct_anthropic_test"); + assert_eq!(diagnostics["admin_probe_billable_tokens"], 117); + } + + #[test] + fn upstream_error_summary_prefers_anthropic_error_message() { + let summary = upstream_error_summary( + StatusCode::UNAUTHORIZED, + &Bytes::from_static(br#"{"error":{"message":"bad api key"}}"#), + ); + + assert_eq!(summary, "upstream returned HTTP 401: bad api key"); + } + + #[tokio::test] + async fn read_limited_response_body_rejects_large_content_length() { + let url = serve_one_http_response( + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + MAX_PROBE_RESPONSE_BYTES + 1 + ) + .into_bytes(), + ) + .await; + let response = reqwest::Client::new() + .get(url) + .send() + .await + .expect("response"); + + let err = read_limited_response_body(response) + .await + .expect_err("body cap"); + + assert_eq!(err, "upstream probe response is too large"); + } + + #[tokio::test] + async fn read_limited_response_body_rejects_stream_past_cap() { + let mut raw = b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n".to_vec(); + raw.extend(vec![b'a'; MAX_PROBE_RESPONSE_BYTES + 1]); + let url = serve_one_http_response(raw).await; + let response = reqwest::Client::new() + .get(url) + .send() + .await + .expect("response"); + + let err = read_limited_response_body(response) + .await + .expect_err("body cap"); + + assert_eq!(err, "upstream probe response is too large"); + } + + async fn serve_one_http_response(raw_response: Vec) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind"); + let addr = listener.local_addr().expect("addr"); + tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.expect("accept"); + let mut request_buffer = [0u8; 1024]; + let _ = stream.read(&mut request_buffer).await; + stream + .write_all(&raw_response) + .await + .expect("write response"); + }); + format!("http://{addr}/probe") + } +} diff --git a/crates/llm-access/src/lib.rs b/crates/llm-access/src/lib.rs index 99f43b2e..f1509150 100644 --- a/crates/llm-access/src/lib.rs +++ b/crates/llm-access/src/lib.rs @@ -4,6 +4,7 @@ mod activity; mod admin; /// Process allocator tuning. pub mod allocator; +mod anthropic_upstream_probe; pub mod cluster; mod codex_refresh; mod codex_status; @@ -424,6 +425,14 @@ pub fn router_with_simulator( axum::routing::patch(admin::patch_admin_anthropic_upstream_channel) .delete(admin::delete_admin_anthropic_upstream_channel), ) + .route( + "/admin/kiro-gateway/anthropic-upstreams/:name/refresh-models", + post(admin::refresh_admin_anthropic_upstream_models), + ) + .route( + "/admin/kiro-gateway/anthropic-upstreams/:name/test", + post(admin::test_admin_anthropic_upstream_model), + ) .route( "/admin/kiro-gateway/accounts", get(admin::list_admin_kiro_accounts).post(admin::create_admin_kiro_manual_account), @@ -784,6 +793,23 @@ mod tests { ) -> anyhow::Result<()> { Ok(()) } + + async fn record_codex_image_key_usage( + &self, + _key_id: &str, + _usage_tokens: Option, + _used_at_ms: i64, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn record_anthropic_upstream_channel_usage( + &self, + _channel_name: &str, + _delta: llm_access_core::store::AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()> { + Ok(()) + } } fn test_router() -> axum::Router { diff --git a/crates/llm-access/src/provider.rs b/crates/llm-access/src/provider.rs index badcc33a..4f1375f9 100644 --- a/crates/llm-access/src/provider.rs +++ b/crates/llm-access/src/provider.rs @@ -43,9 +43,10 @@ use axum::{ http::{Request, StatusCode}, response::Response, }; +pub(crate) use client::anthropic_upstream_client; use client::{ - build_provider_client, provider_client_cache_capacity, provider_client_pool_idle_timeout, - provider_client_pool_max_idle_per_host, + build_anthropic_upstream_client, build_provider_client, provider_client_cache_capacity, + provider_client_pool_idle_timeout, provider_client_pool_max_idle_per_host, }; pub(crate) use codex_auth::{ codex_upstream_base_url, compute_codex_upstream_url, resolve_codex_client_version, @@ -540,6 +541,14 @@ static DEFAULT_PROVIDER_CLIENT: std::sync::LazyLock = static PROVIDER_CLIENT_CACHE: std::sync::LazyLock< Mutex>, > = std::sync::LazyLock::new(|| Mutex::new(LruCache::new(provider_client_cache_capacity()))); +static DEFAULT_ANTHROPIC_UPSTREAM_CLIENT: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + build_anthropic_upstream_client(None) + .expect("default anthropic upstream client should build") + }); +static ANTHROPIC_UPSTREAM_CLIENT_CACHE: std::sync::LazyLock< + Mutex>, +> = std::sync::LazyLock::new(|| Mutex::new(LruCache::new(provider_client_cache_capacity()))); static KIRO_REMOTE_MEDIA_CLIENT: std::sync::LazyLock = std::sync::LazyLock::new(|| { reqwest::Client::builder() diff --git a/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs b/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs index 67c2227c..f7c935c1 100644 --- a/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs +++ b/crates/llm-access/src/provider/anthropic_upstream_dispatch.rs @@ -15,8 +15,8 @@ use axum::{ }; use futures_util::StreamExt; use llm_access_anthropic_pool::{ - build_messages_url, merge_usage, parse_usage_from_value, AnthropicUsageSummary, - SmoothWeightedRoundRobin, WeightedChannel, + apply_anthropic_auth_headers, build_messages_url, merge_usage, parse_usage_from_value, + AnthropicUsageSummary, SmoothWeightedRoundRobin, WeightedChannel, ANTHROPIC_VERSION_2023_06_01, }; use llm_access_core::{ provider::{ProtocolFamily, ProviderType}, @@ -29,7 +29,7 @@ use llm_access_core::{ use serde_json::Value; use super::{ - client::provider_client, + client::anthropic_upstream_client, kiro_error::kiro_json_error, kiro_protocol::normalized_kiro_messages_path, limiter::{kiro_key_limit_response, try_acquire_key_permit}, @@ -373,7 +373,7 @@ async fn dispatch_one_route( ); }, }; - let client = match provider_client(route.proxy.as_ref()) { + let client = match anthropic_upstream_client(route.proxy.as_ref()) { Ok(client) => client, Err(err) => { tracing::warn!( @@ -422,20 +422,20 @@ async fn dispatch_one_route( }, }; capture_upstream_request_body_json(usage_meta, &upstream_body); - let mut request = client - .post(upstream_url) - .header(header::CONTENT_TYPE, "application/json") - .header("x-api-key", &route.api_key) - .body(upstream_body.clone()); + let mut request = apply_anthropic_auth_headers( + client.post(upstream_url), + &route.api_key, + context + .request_headers + .get("anthropic-version") + .and_then(|value| value.to_str().ok()) + .map(str::trim) + .filter(|value| !value.is_empty()) + .unwrap_or(ANTHROPIC_VERSION_2023_06_01), + ) + .header(header::CONTENT_TYPE, "application/json") + .body(upstream_body.clone()); let usage_context = DirectAnthropicUsageContext::from_dispatch(context, mapped_model.clone()); - let anthropic_version = context - .request_headers - .get("anthropic-version") - .and_then(|value| value.to_str().ok()) - .map(str::trim) - .filter(|value| !value.is_empty()) - .unwrap_or("2023-06-01"); - request = request.header("anthropic-version", anthropic_version); for header_name in ["anthropic-beta", "accept", "user-agent"] { if let Some(value) = context.request_headers.get(header_name) { request = request.header(header_name, value.clone()); diff --git a/crates/llm-access/src/provider/client.rs b/crates/llm-access/src/provider/client.rs index d0eae09a..202ceb17 100644 --- a/crates/llm-access/src/provider/client.rs +++ b/crates/llm-access/src/provider/client.rs @@ -1,11 +1,12 @@ //! Provider HTTP client pool/cache construction and tuning. -use std::{num::NonZeroUsize, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use llm_access_core::store::ProviderProxyConfig; use super::{ - ProviderClientCacheKey, CCTEST_PROXY_CLIENT, DEFAULT_PROVIDER_CLIENT, + kiro_media, ProviderClientCacheKey, ANTHROPIC_UPSTREAM_CLIENT_CACHE, CCTEST_PROXY_CLIENT, + DEFAULT_ANTHROPIC_UPSTREAM_CLIENT, DEFAULT_PROVIDER_CLIENT, DEFAULT_PROVIDER_CLIENT_CACHE_CAPACITY, DEFAULT_PROVIDER_CLIENT_POOL_IDLE_TIMEOUT_SECONDS, DEFAULT_PROVIDER_CLIENT_POOL_MAX_IDLE_PER_HOST, MAX_PROVIDER_CLIENT_CACHE_CAPACITY, MAX_PROVIDER_CLIENT_POOL_IDLE_TIMEOUT_SECONDS, MAX_PROVIDER_CLIENT_POOL_MAX_IDLE_PER_HOST, @@ -15,10 +16,30 @@ use super::{ pub fn build_provider_client( proxy: Option<&ProviderProxyConfig>, ) -> anyhow::Result { - let mut builder = reqwest::Client::builder() + Ok(apply_provider_proxy(provider_client_builder(), proxy)?.build()?) +} + +pub fn build_anthropic_upstream_client( + proxy: Option<&ProviderProxyConfig>, +) -> anyhow::Result { + let mut builder = provider_client_builder().redirect(reqwest::redirect::Policy::none()); + if proxy.is_none() { + builder = builder.dns_resolver(Arc::new(kiro_media::PrivateFilteringDnsResolver)); + } + Ok(apply_provider_proxy(builder, proxy)?.build()?) +} + +fn provider_client_builder() -> reqwest::ClientBuilder { + reqwest::Client::builder() .pool_idle_timeout(provider_client_pool_idle_timeout()) .pool_max_idle_per_host(provider_client_pool_max_idle_per_host()) - .tcp_keepalive(Duration::from_secs(30)); + .tcp_keepalive(Duration::from_secs(30)) +} + +fn apply_provider_proxy( + mut builder: reqwest::ClientBuilder, + proxy: Option<&ProviderProxyConfig>, +) -> anyhow::Result { if let Some(proxy_config) = proxy { let mut proxy = reqwest::Proxy::all(&proxy_config.proxy_url)?; if let Some(username) = proxy_config.proxy_username.as_deref() { @@ -27,17 +48,14 @@ pub fn build_provider_client( } builder = builder.proxy(proxy); } - Ok(builder.build()?) + Ok(builder) } + pub fn provider_client(proxy: Option<&ProviderProxyConfig>) -> anyhow::Result { let Some(proxy_config) = proxy else { return Ok(DEFAULT_PROVIDER_CLIENT.clone()); }; - let cache_key = ProviderClientCacheKey { - proxy_url: proxy_config.proxy_url.clone(), - proxy_username: proxy_config.proxy_username.clone(), - proxy_password: proxy_config.proxy_password.clone(), - }; + let cache_key = provider_client_cache_key(proxy_config); { let mut cache = PROVIDER_CLIENT_CACHE .lock() @@ -54,6 +72,37 @@ pub fn provider_client(proxy: Option<&ProviderProxyConfig>) -> anyhow::Result, +) -> anyhow::Result { + let Some(proxy_config) = proxy else { + return Ok(DEFAULT_ANTHROPIC_UPSTREAM_CLIENT.clone()); + }; + let cache_key = provider_client_cache_key(proxy_config); + { + let mut cache = ANTHROPIC_UPSTREAM_CLIENT_CACHE + .lock() + .expect("anthropic upstream client cache lock"); + if let Some(client) = cache.get(&cache_key).cloned() { + return Ok(client); + } + } + let client = build_anthropic_upstream_client(Some(proxy_config))?; + ANTHROPIC_UPSTREAM_CLIENT_CACHE + .lock() + .expect("anthropic upstream client cache lock") + .put(cache_key, client.clone()); + Ok(client) +} + +fn provider_client_cache_key(proxy_config: &ProviderProxyConfig) -> ProviderClientCacheKey { + ProviderClientCacheKey { + proxy_url: proxy_config.proxy_url.clone(), + proxy_username: proxy_config.proxy_username.clone(), + proxy_password: proxy_config.proxy_password.clone(), + } +} + pub fn cctest_proxy_client() -> reqwest::Client { CCTEST_PROXY_CLIENT.clone() } @@ -86,3 +135,100 @@ pub fn provider_client_pool_max_idle_per_host() -> usize { .map(|value| value.min(MAX_PROVIDER_CLIENT_POOL_MAX_IDLE_PER_HOST)) .unwrap_or(DEFAULT_PROVIDER_CLIENT_POOL_MAX_IDLE_PER_HOST) } + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + }; + + use super::*; + + #[tokio::test] + async fn anthropic_upstream_client_does_not_follow_redirects() { + let (url, target_hits) = spawn_redirect_server().await; + let client = build_anthropic_upstream_client(None).expect("client should build"); + + let response = client + .get(url) + .send() + .await + .expect("request should complete"); + + assert_eq!(response.status(), reqwest::StatusCode::FOUND); + assert_eq!(target_hits.load(Ordering::SeqCst), 0, "redirect target must not be requested"); + } + + #[tokio::test] + async fn anthropic_upstream_client_rejects_localhost_dns() { + let (url, _) = spawn_redirect_server_with_host("localhost").await; + let client = build_anthropic_upstream_client(None).expect("client should build"); + + let error = client + .get(url) + .send() + .await + .expect_err("localhost DNS must be rejected before connect"); + + assert!(error_contains(&error, "private or local"), "unexpected error: {error:?}"); + } + + async fn spawn_redirect_server() -> (String, Arc) { + spawn_redirect_server_with_host("127.0.0.1").await + } + + async fn spawn_redirect_server_with_host(host: &str) -> (String, Arc) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("redirect test server should bind"); + let port = listener.local_addr().expect("local addr").port(); + let target_hits = Arc::new(AtomicUsize::new(0)); + let server_hits = Arc::clone(&target_hits); + tokio::spawn(async move { + while let Ok((mut stream, _peer)) = listener.accept().await { + let request_hits = Arc::clone(&server_hits); + tokio::spawn(async move { + let mut buffer = [0u8; 1024]; + let Ok(read) = stream.read(&mut buffer).await else { + return; + }; + let request = String::from_utf8_lossy(&buffer[..read]); + let response = if request.starts_with("GET /target ") { + request_hits.fetch_add(1, Ordering::SeqCst); + "HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok" + } else { + concat!( + "HTTP/1.1 302 Found\r\n", + "Location: /target\r\n", + "Content-Length: 0\r\n", + "Connection: close\r\n", + "\r\n" + ) + }; + let _ = stream.write_all(response.as_bytes()).await; + }); + } + }); + (format!("http://{host}:{port}/start"), target_hits) + } + + fn error_contains(error: &(dyn std::error::Error + 'static), needle: &str) -> bool { + if error.to_string().contains(needle) { + return true; + } + let mut source = error.source(); + while let Some(error) = source { + if error.to_string().contains(needle) { + return true; + } + source = error.source(); + } + false + } +} diff --git a/crates/llm-access/src/provider/tests.rs b/crates/llm-access/src/provider/tests.rs index f5bf3241..aefce886 100644 --- a/crates/llm-access/src/provider/tests.rs +++ b/crates/llm-access/src/provider/tests.rs @@ -916,6 +916,23 @@ impl ControlStore for TestStore { ) -> anyhow::Result<()> { Ok(()) } + + async fn record_codex_image_key_usage( + &self, + _key_id: &str, + _usage_tokens: Option, + _used_at_ms: i64, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn record_anthropic_upstream_channel_usage( + &self, + _channel_name: &str, + _delta: llm_access_core::store::AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()> { + Ok(()) + } } #[derive(Default)] @@ -936,6 +953,23 @@ impl ControlStore for FailingStore { ) -> anyhow::Result<()> { Ok(()) } + + async fn record_codex_image_key_usage( + &self, + _key_id: &str, + _usage_tokens: Option, + _used_at_ms: i64, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn record_anthropic_upstream_channel_usage( + &self, + _channel_name: &str, + _delta: llm_access_core::store::AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()> { + Ok(()) + } } struct StaticAdminConfigStore { @@ -1341,6 +1375,23 @@ impl ControlStore for RecordingControlStore { .push(event.clone()); Ok(()) } + + async fn record_codex_image_key_usage( + &self, + _key_id: &str, + _usage_tokens: Option, + _used_at_ms: i64, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn record_anthropic_upstream_channel_usage( + &self, + _channel_name: &str, + _delta: llm_access_core::store::AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()> { + Ok(()) + } } async fn wait_for_usage_event_count(store: &RecordingControlStore, expected: usize) { diff --git a/crates/llm-access/src/runtime.rs b/crates/llm-access/src/runtime.rs index 28350996..b49a28db 100644 --- a/crates/llm-access/src/runtime.rs +++ b/crates/llm-access/src/runtime.rs @@ -1970,6 +1970,16 @@ impl ControlStore for UsageAccountingControlStore { .record_codex_image_key_usage(key_id, usage_tokens, used_at_ms) .await } + + async fn record_anthropic_upstream_channel_usage( + &self, + channel_name: &str, + delta: llm_access_core::store::AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()> { + self.control_store + .record_anthropic_upstream_channel_usage(channel_name, delta) + .await + } } #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] @@ -2157,9 +2167,9 @@ mod tests { use llm_access_core::{ provider::{ProtocolFamily, ProviderType}, store::{ - AdminRuntimeConfig, AuthenticatedKey, ControlStore, KeyUsageRollupDelta, - UsageEventSink, UsageRollupApplyReport, UsageRollupBatch, UsageRollupBatchSink, - UsageRollupDigestMismatch, + AdminRuntimeConfig, AnthropicUpstreamChannelUsageDelta, AuthenticatedKey, ControlStore, + KeyUsageRollupDelta, UsageEventSink, UsageRollupApplyReport, UsageRollupBatch, + UsageRollupBatchSink, UsageRollupDigestMismatch, }, usage::UsageEvent, }; @@ -2403,6 +2413,7 @@ mod tests { #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] struct StaticControlStore { key: AuthenticatedKey, + anthropic_channel_usage: Arc>>, } #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] @@ -2418,6 +2429,27 @@ mod tests { async fn apply_usage_rollup(&self, _event: &UsageEvent) -> anyhow::Result<()> { anyhow::bail!("usage rollups must be persisted by the accounting flusher") } + + async fn record_codex_image_key_usage( + &self, + _key_id: &str, + _usage_tokens: Option, + _used_at_ms: i64, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn record_anthropic_upstream_channel_usage( + &self, + channel_name: &str, + delta: AnthropicUpstreamChannelUsageDelta, + ) -> anyhow::Result<()> { + self.anthropic_channel_usage + .lock() + .await + .push((channel_name.to_string(), delta)); + Ok(()) + } } #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] @@ -3033,6 +3065,7 @@ mod tests { let control_store = super::UsageAccountingControlStore::new( Arc::new(StaticControlStore { key: sample_authenticated_key(), + anthropic_channel_usage: Arc::new(Mutex::new(Vec::new())), }), accounting.clone(), ); @@ -3066,6 +3099,49 @@ mod tests { ]]); } + #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] + #[tokio::test] + async fn usage_accounting_control_store_forwards_anthropic_channel_usage() { + let rollup_sink = Arc::new(RecordingUsageRollupSink::default()); + let analytics_sink = Arc::new(RecordingUsageEventSink::default()); + let (_journal_root, journal_sink) = test_journal_sink(); + let (_backlog_root, rollup_backlog) = test_rollup_backlog(); + let runtime_config = Arc::new(RwLock::new(AdminRuntimeConfig::default())); + let (accounting, _handle) = super::UsageAccounting::new( + rollup_sink, + journal_sink, + analytics_sink, + runtime_config, + rollup_backlog, + super::PendingUsageRollups::default(), + Some("node-test".to_string()), + ) + .expect("usage accounting"); + let recorded_usage = Arc::new(Mutex::new(Vec::new())); + let control_store = super::UsageAccountingControlStore::new( + Arc::new(StaticControlStore { + key: sample_authenticated_key(), + anthropic_channel_usage: recorded_usage.clone(), + }), + accounting, + ); + let delta = AnthropicUpstreamChannelUsageDelta { + input_uncached_tokens: 11, + input_cached_tokens: 3, + output_tokens: 5, + billable_tokens: 36, + usage_missing: false, + used_at_ms: 1_700_000_000_123, + }; + + control_store + .record_anthropic_upstream_channel_usage("yl", delta) + .await + .expect("forward channel usage"); + + assert_eq!(recorded_usage.lock().await.as_slice(), &[("yl".to_string(), delta)]); + } + #[cfg(any(feature = "duckdb-runtime", feature = "duckdb-bundled"))] #[tokio::test] async fn usage_accounting_loads_initial_rollup_backlog_into_overlay() { @@ -3108,6 +3184,7 @@ mod tests { let control_store = super::UsageAccountingControlStore::new( Arc::new(StaticControlStore { key: sample_authenticated_key(), + anthropic_channel_usage: Arc::new(Mutex::new(Vec::new())), }), accounting, );