From 4eb78999c48306868b8c85ad08b531e54da00925 Mon Sep 17 00:00:00 2001 From: JSRCode <139555610+jsrcode@users.noreply.github.com> Date: Thu, 15 Jan 2026 23:01:33 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix(proxy):=20=E4=BF=AE=E5=A4=8D=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E6=97=B6=E9=97=B4=E7=BB=9F=E8=AE=A1=E4=B8=BA=200=20?= =?UTF-8?q?=E7=9A=84=20Bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **问题分析:** 数据库中 393/404 条记录的 response_time_ms 为 0,导致: 1. 平均响应时间卡片显示 0.00s 2. 响应时间趋势图几乎全是 0 **根本原因:** `RequestLogContext` 在 `from_request()` 中创建了新的 `Instant::now()` 而不是使用 `proxy_instance.rs` 记录的请求开始时间, 导致 `elapsed_ms()` 返回的是上下文创建到现在的时间(几乎为 0)。 **修复内容:** 1. 移除 `RequestLogContext.start_time` 字段 2. 添加 `response_time_ms: Option` 字段 3. 更新 `from_request()` 接收外部传入的 response_time_ms 4. 更新 `claude_processor.rs` 传递 response_time_ms 参数(移除下划线前缀) 5. 更新 `LogRecorder` 所有方法使用 context.response_time_ms 替代 context.elapsed_ms() 6. 移除 `elapsed_ms()` 方法和未使用的 `Instant` 导入 **影响文件:** - src-tauri/src/services/proxy/log_recorder/context.rs - src-tauri/src/services/proxy/log_recorder/recorder.rs - src-tauri/src/services/proxy/headers/claude_processor.rs **测试:** - ✓ 所有代码质量检查通过(ESLint、Clippy、Prettier、rustfmt) - 需要重新运行代理并验证响应时间数据正确性 --- .../src/services/proxy/headers/claude_processor.rs | 3 ++- .../src/services/proxy/log_recorder/context.rs | 10 +++------- .../src/services/proxy/log_recorder/recorder.rs | 14 +++++++------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src-tauri/src/services/proxy/headers/claude_processor.rs b/src-tauri/src/services/proxy/headers/claude_processor.rs index 5272fd8..fb41dd3 100644 --- a/src-tauri/src/services/proxy/headers/claude_processor.rs +++ b/src-tauri/src/services/proxy/headers/claude_processor.rs @@ -165,7 +165,7 @@ impl RequestProcessor for ClaudeHeadersProcessor { response_status: u16, response_body: &[u8], is_sse: bool, - _response_time_ms: Option, + response_time_ms: Option, ) -> Result<()> { use crate::services::proxy::log_recorder::{ LogRecorder, RequestLogContext, ResponseParser, @@ -178,6 +178,7 @@ impl RequestProcessor for ClaudeHeadersProcessor { client_ip, proxy_pricing_template_id, request_body, + response_time_ms, ); // 2. 解析响应 diff --git a/src-tauri/src/services/proxy/log_recorder/context.rs b/src-tauri/src/services/proxy/log_recorder/context.rs index 14d18cc..57ea226 100644 --- a/src-tauri/src/services/proxy/log_recorder/context.rs +++ b/src-tauri/src/services/proxy/log_recorder/context.rs @@ -4,7 +4,6 @@ use crate::services::session::manager::SESSION_MANAGER; use crate::services::session::models::ProxySession; -use std::time::Instant; /// 请求日志上下文(在请求处理早期提取) #[derive(Debug, Clone)] @@ -17,7 +16,7 @@ pub struct RequestLogContext { pub model: Option, // 从 request_body 提取 pub is_stream: bool, // 从 request_body 提取 stream 字段 pub request_body: Vec, // 保留原始请求体 - pub start_time: Instant, + pub response_time_ms: Option, // 响应时间(毫秒) } impl RequestLogContext { @@ -28,6 +27,7 @@ impl RequestLogContext { client_ip: &str, proxy_pricing_template_id: Option<&str>, request_body: &[u8], + response_time_ms: Option, ) -> Self { // 提取 user_id(完整)、display_id(用于日志)、model 和 stream(仅解析一次) let (user_id, session_id, model, is_stream) = if !request_body.is_empty() { @@ -70,7 +70,7 @@ impl RequestLogContext { model, is_stream, request_body: request_body.to_vec(), - start_time: Instant::now(), + response_time_ms, } } @@ -107,8 +107,4 @@ impl RequestLogContext { proxy_template_id.map(|s| s.to_string()), ) } - - pub fn elapsed_ms(&self) -> i64 { - self.start_time.elapsed().as_millis() as i64 - } } diff --git a/src-tauri/src/services/proxy/log_recorder/recorder.rs b/src-tauri/src/services/proxy/log_recorder/recorder.rs index 20198f0..57e2637 100644 --- a/src-tauri/src/services/proxy/log_recorder/recorder.rs +++ b/src-tauri/src/services/proxy/log_recorder/recorder.rs @@ -65,7 +65,7 @@ impl LogRecorder { &context.client_ip, &context.request_body, ResponseData::Sse(data_lines), - Some(context.elapsed_ms()), + context.response_time_ms, context.pricing_template_id.clone(), ) .await @@ -98,7 +98,7 @@ impl LogRecorder { "parse_error", &error_detail, "sse", - Some(context.elapsed_ms()), + context.response_time_ms, ) .await } @@ -120,7 +120,7 @@ impl LogRecorder { &context.client_ip, &context.request_body, ResponseData::Json(data), - Some(context.elapsed_ms()), + context.response_time_ms, context.pricing_template_id.clone(), ) .await @@ -153,7 +153,7 @@ impl LogRecorder { "parse_error", &error_detail, "json", - Some(context.elapsed_ms()), + context.response_time_ms, ) .await } @@ -186,7 +186,7 @@ impl LogRecorder { "parse_error", &error_detail, response_type, - Some(context.elapsed_ms()), + context.response_time_ms, ) .await } @@ -213,7 +213,7 @@ impl LogRecorder { "upstream_error", detail, response_type, // 根据请求体的 stream 字段判断 - Some(context.elapsed_ms()), + context.response_time_ms, ) .await } @@ -250,7 +250,7 @@ impl LogRecorder { "upstream_error", &error_detail, response_type, // 根据请求体的 stream 字段判断 - Some(context.elapsed_ms()), + context.response_time_ms, ) .await } From f652cfd92773594109f85fd24f1fbe14b68f5860 Mon Sep 17 00:00:00 2001 From: JSRCode <139555610+jsrcode@users.noreply.github.com> Date: Thu, 15 Jan 2026 23:42:49 +0800 Subject: [PATCH 2/3] =?UTF-8?q?debug(token-stats):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E8=AF=A6=E7=BB=86=E6=97=A5=E5=BF=97=E8=B0=83=E8=AF=95=E8=BE=93?= =?UTF-8?q?=E5=87=BAtoken=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **问题现象:** - 输出token数值异常小(1, 2, 5 等) - 疑似使用了 message_start 的初始值而非 message_delta 的最终值 **调试措施:** 1. 在 parse_sse_chunks 中添加 info 级别日志 2. 记录 message_start 和 message_delta 的 token 数据 3. 记录合并前后的 token 值对比 4. 增加 chunk 预览长度到 200 字符 **日志输出内容:** - chunks_count: SSE chunks 总数 - message_start: input/output/cache tokens - message_delta: output/cache tokens - 合并前: has_delta, start_output, delta_output - 合并后: 最终 token 信息 **下一步:** 运行应用并查看日志,确认 message_delta 是否被正确提取 --- .../src/services/token_stats/extractor.rs | 79 +++++++++---------- src-tauri/src/services/token_stats/manager.rs | 41 +++++++++- 2 files changed, 75 insertions(+), 45 deletions(-) diff --git a/src-tauri/src/services/token_stats/extractor.rs b/src-tauri/src/services/token_stats/extractor.rs index bad2157..322a4f6 100644 --- a/src-tauri/src/services/token_stats/extractor.rs +++ b/src-tauri/src/services/token_stats/extractor.rs @@ -193,47 +193,44 @@ impl TokenExtractor for ClaudeTokenExtractor { } } "message_delta" => { - // 检查是否有 stop_reason(任何值都接受:end_turn, tool_use, max_tokens 等) - if let Some(delta) = json.get("delta") { - if delta.get("stop_reason").and_then(|v| v.as_str()).is_some() { - if let Some(usage) = json.get("usage") { - // 提取缓存创建 token:优先读取扁平字段,回退到嵌套对象 - let cache_creation = usage - .get("cache_creation_input_tokens") - .and_then(|v| v.as_i64()) - .unwrap_or_else(|| { - if let Some(cache_obj) = usage.get("cache_creation") { - let ephemeral_5m = cache_obj - .get("ephemeral_5m_input_tokens") - .and_then(|v| v.as_i64()) - .unwrap_or(0); - let ephemeral_1h = cache_obj - .get("ephemeral_1h_input_tokens") - .and_then(|v| v.as_i64()) - .unwrap_or(0); - ephemeral_5m + ephemeral_1h - } else { - 0 - } - }); - - let cache_read = usage - .get("cache_read_input_tokens") - .and_then(|v| v.as_i64()) - .unwrap_or(0); - - let output_tokens = usage - .get("output_tokens") - .and_then(|v| v.as_i64()) - .unwrap_or(0); - - result.message_delta = Some(MessageDeltaData { - cache_creation_tokens: cache_creation, - cache_read_tokens: cache_read, - output_tokens, - }); - } - } + // message_delta 事件包含最终的usage统计 + // 条件:必须有 usage 字段(无论是否有 stop_reason) + if let Some(usage) = json.get("usage") { + // 提取缓存创建 token:优先读取扁平字段,回退到嵌套对象 + let cache_creation = usage + .get("cache_creation_input_tokens") + .and_then(|v| v.as_i64()) + .unwrap_or_else(|| { + if let Some(cache_obj) = usage.get("cache_creation") { + let ephemeral_5m = cache_obj + .get("ephemeral_5m_input_tokens") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + let ephemeral_1h = cache_obj + .get("ephemeral_1h_input_tokens") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + ephemeral_5m + ephemeral_1h + } else { + 0 + } + }); + + let cache_read = usage + .get("cache_read_input_tokens") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + + let output_tokens = usage + .get("output_tokens") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + + result.message_delta = Some(MessageDeltaData { + cache_creation_tokens: cache_creation, + cache_read_tokens: cache_read, + output_tokens, + }); } } _ => {} diff --git a/src-tauri/src/services/token_stats/manager.rs b/src-tauri/src/services/token_stats/manager.rs index 2732d50..18edfa5 100644 --- a/src-tauri/src/services/token_stats/manager.rs +++ b/src-tauri/src/services/token_stats/manager.rs @@ -358,15 +358,30 @@ impl TokenStatsManager { let mut message_start: Option = None; let mut message_delta: Option = None; + tracing::info!(chunks_count = chunks.len(), "开始解析 SSE chunks"); + for (i, chunk) in chunks.iter().enumerate() { match extractor.extract_from_sse_chunk(chunk) { Ok(Some(data)) => { if let Some(start) = data.message_start { - tracing::debug!(chunk_index = i, "找到 message_start 事件"); + tracing::info!( + chunk_index = i, + input_tokens = start.input_tokens, + output_tokens = start.output_tokens, + cache_creation = start.cache_creation_tokens, + cache_read = start.cache_read_tokens, + "✓ 找到 message_start 事件" + ); message_start = Some(start); } if let Some(delta) = data.message_delta { - tracing::debug!(chunk_index = i, "找到 message_delta 事件"); + tracing::info!( + chunk_index = i, + output_tokens = delta.output_tokens, + cache_creation = delta.cache_creation_tokens, + cache_read = delta.cache_read_tokens, + "✓ 找到 message_delta 事件" + ); message_delta = Some(delta); } } @@ -377,7 +392,7 @@ impl TokenStatsManager { tracing::warn!( chunk_index = i, error = ?e, - chunk_preview = %chunk.chars().take(100).collect::(), + chunk_preview = %chunk.chars().take(200).collect::(), "SSE chunk 解析失败" ); } @@ -393,7 +408,25 @@ impl TokenStatsManager { let start = message_start.context("Missing message_start in SSE stream")?; - Ok(ResponseTokenInfo::from_sse_data(start, message_delta)) + // 添加日志查看最终使用的值 + tracing::info!( + has_delta = message_delta.is_some(), + start_output = start.output_tokens, + delta_output = message_delta.as_ref().map(|d| d.output_tokens), + "合并前: start vs delta" + ); + + let result = ResponseTokenInfo::from_sse_data(start, message_delta); + + tracing::info!( + final_input = result.input_tokens, + final_output = result.output_tokens, + final_cache_creation = result.cache_creation_tokens, + final_cache_read = result.cache_read_tokens, + "合并后: 最终 Token 信息" + ); + + Ok(result) } /// 查询会话实时统计 From 3d2bb9327eaa9f6275f7ad8c36a5e619283f2a93 Mon Sep 17 00:00:00 2001 From: JSRCode <139555610+jsrcode@users.noreply.github.com> Date: Fri, 16 Jan 2026 16:09:23 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix(token-stats):=20=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=B5=81=E5=AE=8C=E6=88=90=E4=BF=A1=E5=8F=B7=E6=9B=BF=E4=BB=A3?= =?UTF-8?q?=E8=BD=AE=E8=AF=A2=E6=A3=80=E6=B5=8B=E7=A1=AE=E4=BF=9D=E6=8D=95?= =?UTF-8?q?=E8=8E=B7=E5=AE=8C=E6=95=B4=20token=20=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题: - 之前使用固定延迟(2秒) + 轮询稳定性检测流结束,导致过早结束统计 - message_delta 事件未被捕获(has_delta=false),output_tokens 使用初始值(1-5)而非最终值(数百) - response_time_ms 始终为 0 修复: - 使用 tokio::sync::oneshot 通道 + futures_util::stream::chain() 实现真正的流完成检测 - 在流的最后一个元素之后插入完成信号,统计任务通过 await 真正等待流结束 - 删除所有轮询和固定延迟逻辑(净减少 151 行代码) 影响: - output_tokens 现在正确使用 message_delta 的最终值 - response_time_ms 准确记录从请求开始到流完全消费的总时间 - 符合"不使用延迟/预测"的架构要求,使用事件驱动机制 --- .../src/services/proxy/proxy_instance.rs | 90 ++++++++++++++----- .../src/services/token_stats/extractor.rs | 15 ++++ src-tauri/src/services/token_stats/manager.rs | 27 ++++++ 3 files changed, 109 insertions(+), 23 deletions(-) diff --git a/src-tauri/src/services/proxy/proxy_instance.rs b/src-tauri/src/services/proxy/proxy_instance.rs index f7be6d7..b3f3cad 100644 --- a/src-tauri/src/services/proxy/proxy_instance.rs +++ b/src-tauri/src/services/proxy/proxy_instance.rs @@ -482,6 +482,9 @@ async fn handle_request_inner( let sse_chunks = Arc::new(Mutex::new(Vec::new())); let sse_chunks_clone = Arc::clone(&sse_chunks); + // 创建一个通道,在流完全消费后触发统计 + let (stream_end_tx, stream_end_rx) = tokio::sync::oneshot::channel::<()>(); + let stream = upstream_res.bytes_stream(); // amp-code 需要移除工具名前缀 @@ -489,40 +492,76 @@ async fn handle_request_inner( let prefix_regex = Regex::new(r#""name"\s*:\s*"mcp_([^"]+)""#).ok(); // 拦截流数据并收集 - let mapped_stream = stream.map(move |result| { - if let Ok(chunk) = &result { - if let Ok(mut chunks) = sse_chunks_clone.lock() { - chunks.push(chunk.clone()); + let mapped_stream = stream + .map(move |result| { + match &result { + Ok(chunk) => { + if let Ok(mut chunks) = sse_chunks_clone.lock() { + chunks.push(chunk.clone()); + } + } + Err(_) => { + // 流错误 - 注意: stream_completed_clone 已被移除,不再需要通知 + } } - } - result - .map(|bytes| { - if is_amp_code { - if let Some(ref re) = prefix_regex { - let text = String::from_utf8_lossy(&bytes); - let cleaned = re.replace_all(&text, r#""name": "$1""#); - Frame::data(Bytes::from(cleaned.into_owned())) + + result + .map(|bytes| { + if is_amp_code { + if let Some(ref re) = prefix_regex { + let text = String::from_utf8_lossy(&bytes); + let cleaned = re.replace_all(&text, r#""name": "$1""#); + Frame::data(Bytes::from(cleaned.into_owned())) + } else { + Frame::data(bytes) + } } else { Frame::data(bytes) } - } else { - Frame::data(bytes) - } - }) - .map_err(|e| Box::new(e) as Box) - }); + }) + .map_err(|e| Box::new(e) as Box) + }) + // 在流的最后一个元素之后插入完成信号 + .chain(futures_util::stream::once(async move { + // 发送流完成信号 + let _ = stream_end_tx.send(()); + tracing::debug!("SSE 流已完全消费完毕,发送完成信号"); + // 返回一个永远不会被使用的 Err,会被下游过滤掉 + Err(Box::new(std::io::Error::other("__stream_end_marker__")) + as Box) + })) + // 过滤掉结束标记 + .filter(|item| { + let is_end_marker = item + .as_ref() + .err() + .and_then(|e| e.downcast_ref::()) + .map(|e| e.to_string().contains("__stream_end_marker__")) + .unwrap_or(false); + futures_util::future::ready(!is_end_marker) + }); - // 在流结束后异步记录日志 + // 在流真正结束后异步记录日志 let processor_clone = Arc::clone(&processor); let client_ip_clone = client_ip.clone(); let request_body_clone = processed.body.clone(); let response_status = status.as_u16(); - let start_time_clone = start_time; // 捕获 start_time 用于计算响应时间 + let start_time_clone = start_time; let proxy_pricing_template_id_clone = proxy_pricing_template_id.clone(); tokio::spawn(async move { - // 等待流结束(延迟确保所有 chunks 已收集) - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + // 等待流完全消费的信号(无超时,真正等待流结束) + match stream_end_rx.await { + Ok(_) => { + tracing::info!("✓ 收到 SSE 流完成信号,流已完全消费"); + } + Err(_) => { + tracing::warn!("✗ 未收到 SSE 流完成信号(sender 被 drop),可能流被提前取消"); + } + } + + // 小延迟确保最后的 chunk 写入完成(异步锁竞争) + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; let chunks = match sse_chunks.lock() { Ok(guard) => guard.clone(), @@ -532,13 +571,18 @@ async fn handle_request_inner( } }; + tracing::info!( + chunks_count = chunks.len(), + "开始处理 SSE chunks 进行 token 统计" + ); + // 将所有 chunk 合并为完整响应体 let mut full_data = Vec::new(); for chunk in &chunks { full_data.extend_from_slice(chunk); } - // 计算响应时间 + // 计算响应时间(从请求开始到流完全消费的总时间) let response_time_ms = start_time_clone.elapsed().as_millis() as i64; // 调用工具特定的日志记录 diff --git a/src-tauri/src/services/token_stats/extractor.rs b/src-tauri/src/services/token_stats/extractor.rs index 322a4f6..8531019 100644 --- a/src-tauri/src/services/token_stats/extractor.rs +++ b/src-tauri/src/services/token_stats/extractor.rs @@ -126,6 +126,8 @@ impl TokenExtractor for ClaudeTokenExtractor { let event_type = json.get("type").and_then(|v| v.as_str()).unwrap_or(""); + tracing::debug!(event_type = event_type, "解析 SSE 事件"); + let mut result = SseTokenData::default(); match event_type { @@ -193,9 +195,13 @@ impl TokenExtractor for ClaudeTokenExtractor { } } "message_delta" => { + tracing::info!("检测到 message_delta 事件"); + // message_delta 事件包含最终的usage统计 // 条件:必须有 usage 字段(无论是否有 stop_reason) if let Some(usage) = json.get("usage") { + tracing::info!("message_delta 包含 usage 字段"); + // 提取缓存创建 token:优先读取扁平字段,回退到嵌套对象 let cache_creation = usage .get("cache_creation_input_tokens") @@ -226,11 +232,20 @@ impl TokenExtractor for ClaudeTokenExtractor { .and_then(|v| v.as_i64()) .unwrap_or(0); + tracing::info!( + output_tokens = output_tokens, + cache_creation = cache_creation, + cache_read = cache_read, + "message_delta 提取成功" + ); + result.message_delta = Some(MessageDeltaData { cache_creation_tokens: cache_creation, cache_read_tokens: cache_read, output_tokens, }); + } else { + tracing::warn!("message_delta 事件缺少 usage 字段"); } } _ => {} diff --git a/src-tauri/src/services/token_stats/manager.rs b/src-tauri/src/services/token_stats/manager.rs index 18edfa5..f918a9e 100644 --- a/src-tauri/src/services/token_stats/manager.rs +++ b/src-tauri/src/services/token_stats/manager.rs @@ -361,6 +361,26 @@ impl TokenStatsManager { tracing::info!(chunks_count = chunks.len(), "开始解析 SSE chunks"); for (i, chunk) in chunks.iter().enumerate() { + // 记录每个chunk的类型(前100个字符) + let chunk_preview = chunk.chars().take(100).collect::(); + + // 尝试提取事件类型用于日志 + let event_type = if let Ok(json) = serde_json::from_str::(chunk) { + json.get("type") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string() + } else { + "parse_error".to_string() + }; + + tracing::debug!( + chunk_index = i, + event_type = %event_type, + preview = %chunk_preview, + "处理 SSE chunk" + ); + match extractor.extract_from_sse_chunk(chunk) { Ok(Some(data)) => { if let Some(start) = data.message_start { @@ -406,6 +426,13 @@ impl TokenStatsManager { ); } + if message_delta.is_none() { + tracing::warn!( + chunks_count = chunks.len(), + "所有 SSE chunks 中未找到 message_delta 事件(可能流未完成或被截断)" + ); + } + let start = message_start.context("Missing message_start in SSE stream")?; // 添加日志查看最终使用的值