diff --git a/application/core/services/novel_service.py b/application/core/services/novel_service.py index 13c499164..788b76394 100644 --- a/application/core/services/novel_service.py +++ b/application/core/services/novel_service.py @@ -1,4 +1,5 @@ """Novel 应用服务""" +import time as _time import json from datetime import datetime, timezone from typing import List, Optional, Dict, Any @@ -143,6 +144,7 @@ def create_novel( ) self.novel_repository.save(novel) + self._invalidate_novels_cache() return NovelDTO.from_domain(novel) @@ -186,19 +188,56 @@ def _check_has_outline(self, novel_id: str) -> bool: except Exception: return False + # ── 小说列表 TTL 缓存(模块级,跨请求共享)── + _NOVELS_LIST_CACHE_TTL = 10.0 # 秒 + + _novels_list_cache: Dict[str, Dict[str, Any]] = {} + + @staticmethod + def _invalidate_novels_cache() -> None: + """创建/更新/删除小说时清除列表缓存。""" + NovelService._novels_list_cache.pop("_all_novels_dtos", None) + def list_novels(self) -> List[NovelDTO]: - """列出所有小说 + """列出所有小说(批量查询 + TTL 缓存)。 + + 优化:使用 LEFT JOIN 一次性加载 novels + chapters + bible + outline, + 查询数从 1+3N 降至 3 次。模块级 TTL 缓存避免重复查询。 Returns: NovelDTO 列表 """ - novels = self.novel_repository.list_all() + cache_key = "_all_novels_dtos" + cached = self._novels_list_cache.get(cache_key) + if cached is not None and _time.time() - cached["ts"] < self._NOVELS_LIST_CACHE_TTL: + return cached["data"] + + # 尝试批量优化查询 + try: + from infrastructure.persistence.database.query_optimizations import ( + list_all_novels_optimized, + ) + db = getattr(self.novel_repository, "db", None) + if db is not None: + novels = list_all_novels_optimized(db) + else: + novels = self.novel_repository.list_all() + for novel in novels: + self._hydrate_chapters(novel) + except Exception: + novels = self.novel_repository.list_all() + for novel in novels: + self._hydrate_chapters(novel) + dtos = [] for novel in novels: - dto = NovelDTO.from_domain(self._hydrate_chapters(novel)) - dto.has_bible = self._check_has_bible(novel.novel_id.value) - dto.has_outline = self._check_has_outline(novel.novel_id.value) + dto = NovelDTO.from_domain(novel) + # 批量查询已预计算 has_bible / has_outline + dto.has_bible = getattr(novel, "_has_bible", self._check_has_bible(novel.novel_id.value)) + dto.has_outline = getattr(novel, "_has_outline", self._check_has_outline(novel.novel_id.value)) dtos.append(dto) + + self._novels_list_cache[cache_key] = {"data": dtos, "ts": _time.time()} return dtos def delete_novel(self, novel_id: str) -> None: @@ -208,6 +247,7 @@ def delete_novel(self, novel_id: str) -> None: novel_id: 小说 ID """ self.novel_repository.delete(NovelId(novel_id)) + self._invalidate_novels_cache() def add_chapter( self, @@ -260,6 +300,7 @@ def add_chapter( if not any(getattr(c, "number", None) == chapter.number for c in novel.chapters): novel.chapters.append(chapter) self.novel_repository.save(novel) + self._invalidate_novels_cache() # 同步创建 StoryNode 章节节点,并关联到当前活跃的幕 if self.story_node_repository: @@ -358,6 +399,8 @@ def update_novel( novel.generation_prefs, generation_prefs ) + self.novel_repository.save(novel) + self._invalidate_novels_cache() # 增量 patch:避免全量 save 把 autopilot_status 等未改字段写回 stopped patch_fields: Dict[str, Any] = {} if title is not None: @@ -398,6 +441,7 @@ def update_novel_stage(self, novel_id: str, stage: str) -> NovelDTO: novel.stage = NovelStage(stage) self.novel_repository.save(novel) + self._invalidate_novels_cache() return NovelDTO.from_domain(self._hydrate_chapters(novel)) @@ -420,6 +464,7 @@ def update_auto_approve_mode(self, novel_id: str, auto_approve_mode: bool) -> No novel.auto_approve_mode = auto_approve_mode self.novel_repository.save(novel) + self._invalidate_novels_cache() return NovelDTO.from_domain(self._hydrate_chapters(novel)) diff --git a/application/engine/services/beat_middleware.py b/application/engine/services/beat_middleware.py index 65ca1a5ea..9f801e1bd 100644 --- a/application/engine/services/beat_middleware.py +++ b/application/engine/services/beat_middleware.py @@ -33,7 +33,7 @@ import logging from dataclasses import dataclass, field -from typing import List, Optional, Protocol, Tuple +from typing import Any, List, Optional, Protocol, Tuple from application.engine.services.context_builder import Beat diff --git a/application/engine/services/shared_state_repository.py b/application/engine/services/shared_state_repository.py index fb904fb80..32889cf25 100644 --- a/application/engine/services/shared_state_repository.py +++ b/application/engine/services/shared_state_repository.py @@ -10,6 +10,8 @@ - 写入先更新内存,再异步持久化 - 数据结构轻量化,避免内存过大 """ +from __future__ import annotations + import logging import time from dataclasses import dataclass, field diff --git a/application/engine/services/state_publisher.py b/application/engine/services/state_publisher.py index fcce4371c..e27ba253e 100644 --- a/application/engine/services/state_publisher.py +++ b/application/engine/services/state_publisher.py @@ -10,6 +10,8 @@ - 用户立即可见更新 - DB 作为持久化快照 """ +from __future__ import annotations + import logging import time from typing import Any, Dict, List, Optional diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 7db62e42a..a72023255 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "plotpilot", - "version": "1.0.2", + "version": "1.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "plotpilot", - "version": "1.0.2", + "version": "1.1.0", "dependencies": { "@types/dompurify": "^3.0.5", "@vicons/ionicons5": "^0.13.0", diff --git a/frontend/src/components/autopilot/AutopilotDAGView.vue b/frontend/src/components/autopilot/AutopilotDAGView.vue index 2f1671a37..2080a641d 100644 --- a/frontend/src/components/autopilot/AutopilotDAGView.vue +++ b/frontend/src/components/autopilot/AutopilotDAGView.vue @@ -108,8 +108,53 @@ const gapSummary = computed(() => dagStore.registryGaps.map(g => `${g.node_id} (${g.node_type})`).join('、'), ) -/** 周期性拉权威 /status ,避免仅用 DAG Run SSE 把「人工审阅」误标成「运行中」 */ -let autopilotStatusPollTimer: ReturnType | null = null +/** 周期性拉权威 /status ,避免仅用 DAG Run SSE 把「人工审阅」误标成「运行中」。 + * 非 running 时自动降速:paused → 15s,stopped/completed/error/idle → 60s。 + * 页面切后台时暂停,切回时立即拉取一次再恢复。 */ +let autopilotStatusPollTimer: ReturnType | null = null +/** 组件卸载后禁止再创建新 timer,防止僵尸轮询累积 */ +let dagViewUnmounted = false +let autopilotFetchInFlight = false +let autopilotFetchAbort: AbortController | null = null +const AUTOPILOT_FETCH_TIMEOUT_MS = 10000 + +function autopilotPollInterval(): number { + switch (autopilotStatus.value) { + case 'running': return 7000 + case 'paused': return 15000 + default: return 60000 + } +} + +function clearAutopilotPoll() { + if (autopilotStatusPollTimer != null) { + clearTimeout(autopilotStatusPollTimer) + autopilotStatusPollTimer = null + } + if (autopilotFetchAbort) { + autopilotFetchAbort.abort() + autopilotFetchAbort = null + } + autopilotFetchInFlight = false +} + +function scheduleAutopilotPoll() { + if (dagViewUnmounted) return + clearAutopilotPoll() + if (document.hidden) return + autopilotStatusPollTimer = window.setTimeout(() => { + void fetchAutopilotStatus() + }, autopilotPollInterval()) +} + +function handleDAGVisibilityChange() { + if (dagViewUnmounted) return + if (document.hidden) { + clearAutopilotPoll() + } else { + void fetchAutopilotStatus() + } +} async function retryHydrate() { await dagStore.hydrateDagForNovel(props.novelId) @@ -121,22 +166,20 @@ onMounted(async () => { await dagStore.hydrateDagForNovel(props.novelId) await runStore.fetchStatus(props.novelId) await fetchAutopilotStatus() - autopilotStatusPollTimer = window.setInterval(() => { - void fetchAutopilotStatus() - }, 7000) + document.addEventListener('visibilitychange', handleDAGVisibilityChange) }) onUnmounted(() => { - if (autopilotStatusPollTimer != null) { - clearInterval(autopilotStatusPollTimer) - autopilotStatusPollTimer = null - } + dagViewUnmounted = true + clearAutopilotPoll() + document.removeEventListener('visibilitychange', handleDAGVisibilityChange) }) // ★ 监听托管模式 SSE 日志:以 /status 为准合并「人工审阅」态 watch( () => runStore.runStatus, () => { + if (dagViewUnmounted) return void fetchAutopilotStatus() }, ) @@ -186,6 +229,17 @@ function handleSwitchToCard() { // ─── 获取托管模式状态 ─── async function fetchAutopilotStatus() { + if (dagViewUnmounted) return + if (autopilotFetchInFlight) return + autopilotFetchInFlight = true + + if (autopilotFetchAbort) { + autopilotFetchAbort.abort() + } + const ac = new AbortController() + autopilotFetchAbort = ac + const timeoutId = window.setTimeout(() => ac.abort(), AUTOPILOT_FETCH_TIMEOUT_MS) + try { const { apiClient } = await import('@/api/config') const result = await apiClient.get(`/autopilot/${props.novelId}/status`) as Record @@ -197,23 +251,21 @@ async function fetchAutopilotStatus() { if (ap === 'completed') { autopilotStatus.value = 'completed' - return - } - if (ap === 'error') { + } else if (ap === 'error') { autopilotStatus.value = 'error' - return - } - if (ap === 'running' && humanGate) { + } else if (ap === 'running' && humanGate) { autopilotStatus.value = 'paused' - return - } - if (ap === 'running') { + } else if (ap === 'running') { autopilotStatus.value = 'running' - return + } else { + autopilotStatus.value = 'idle' } - autopilotStatus.value = 'idle' } catch { autopilotStatus.value = 'idle' + } finally { + window.clearTimeout(timeoutId) + autopilotFetchInFlight = false + scheduleAutopilotPoll() } } diff --git a/frontend/src/components/autopilot/AutopilotPanel.vue b/frontend/src/components/autopilot/AutopilotPanel.vue index 58716ecd4..e46e43b98 100644 --- a/frontend/src/components/autopilot/AutopilotPanel.vue +++ b/frontend/src/components/autopilot/AutopilotPanel.vue @@ -367,6 +367,8 @@ let statusLastAbort = null /** 连续无法拉取 /status(网络拒绝/超时)时倍增轮询间隔 */ const statusConnectivityFailures = ref(0) let lastStatusPollIntervalMs = -1 +/** 组件卸载后禁止 maybeRestartStatusPollTimer 再创建 setInterval,防止僵尸轮询累积 */ +let panelUnmounted = false // 计算属性 const isRunning = computed(() => status.value?.autopilot_status === 'running') @@ -680,6 +682,7 @@ function clearStatusPoll() { /** 轮询间隔变化时(如后端断连退避)重置 timer,避免固定 3~5s 刷满 Vite 代理日志 */ function maybeRestartStatusPollTimer() { + if (panelUnmounted) return if (statusPollDisabled.value) return const ms = getAdaptivePollInterval() if (statusPollTimer != null && ms === lastStatusPollIntervalMs) { @@ -876,14 +879,28 @@ function stopChapterStream() { // 策略: // - SSE 已连接时:轮询降到 15s 兜底(SSE 已实时驱动刷新,轮询仅防断连漏检) // - SSE 未连接但运行中:5s(需要轮询补偿 SSE 的缺失) -// - 非运行中:3s(用户可能刚操作,需要快速看到状态变化) // - 审阅等待中:10s(用户在看大纲,不需要高频刷新) +// - stopped / completed:30s(无需高频,启动时 start() 会立即拉取恢复) +// - error:15s +// - 其他非运行中:3s(用户可能刚操作,需要快速看到状态变化) function getAdaptivePollInterval() { let base - if (needsReview.value) base = 10000 - else if (!isRunning.value) base = 3000 - else if (sseConnected.value) base = 15000 - else base = 5000 + if (needsReview.value) { + base = 10000 + } else if (!isRunning.value) { + const ap = status.value?.autopilot_status + if (ap === 'stopped' || ap === 'completed') { + base = 30000 + } else if (ap === 'error') { + base = 15000 + } else { + base = 3000 + } + } else if (sseConnected.value) { + base = 15000 + } else { + base = 5000 + } const mult = Math.min(2 ** Math.min(statusConnectivityFailures.value, 8), 128) return Math.min(base * mult, 120_000) } @@ -1183,6 +1200,7 @@ async function forceStopFromError() { onMounted(() => { fetchStatus() }) onUnmounted(() => { + panelUnmounted = true statusFetchSeq += 1 statusFetchInFlight = false // 🔥 重置请求去重标志 if (statusLastAbort) { diff --git a/frontend/src/components/autopilot/NodeDetailPanel.vue b/frontend/src/components/autopilot/NodeDetailPanel.vue index de4b01c8b..1fd73060e 100644 --- a/frontend/src/components/autopilot/NodeDetailPanel.vue +++ b/frontend/src/components/autopilot/NodeDetailPanel.vue @@ -178,6 +178,11 @@ const promptLoading = ref(false) const writingStatus = ref | null>(null) const writingPollError = ref('') let writingPollTimer: ReturnType | null = null +let writingFetchInFlight = false +let writingFetchAbort: AbortController | null = null +let writingFetchSeq = 0 +const WRITING_POLL_INTERVAL_MS = 2500 +const WRITING_FETCH_TIMEOUT_MS = 8000 const WRITING_TELEMETRY_TYPES = new Set(['exec_writer', 'exec_beat']) @@ -232,9 +237,25 @@ const lastTruncateLine = computed(() => { async function fetchWritingTelemetry() { if (!props.novelId || !showWritingTelemetry.value) return + if (writingFetchInFlight) return writingPollError.value = '' + writingFetchInFlight = true + writingFetchSeq += 1 + const seq = writingFetchSeq + + if (writingFetchAbort) { + writingFetchAbort.abort() + } + const ac = new AbortController() + writingFetchAbort = ac + const timeoutId = window.setTimeout(() => ac.abort(), WRITING_FETCH_TIMEOUT_MS) + try { - const res = await fetch(resolveHttpUrl(`/api/v1/autopilot/${props.novelId}/status`)) + const res = await fetch( + resolveHttpUrl(`/api/v1/autopilot/${props.novelId}/status`), + { signal: ac.signal }, + ) + if (seq !== writingFetchSeq) return if (res.status === 404) { writingStatus.value = null writingPollError.value = '该书暂无托管状态' @@ -246,7 +267,17 @@ async function fetchWritingTelemetry() { } writingStatus.value = (await res.json()) as Record } catch (e) { - writingPollError.value = e instanceof Error ? e.message : '网络错误' + if (seq !== writingFetchSeq) return + if (e instanceof Error && e.name === 'AbortError') { + console.warn('[NodeDetailPanel] fetchWritingTelemetry 超时') + } else { + writingPollError.value = e instanceof Error ? e.message : '网络错误' + } + } finally { + window.clearTimeout(timeoutId) + if (seq === writingFetchSeq) { + writingFetchInFlight = false + } } } @@ -255,6 +286,11 @@ function clearWritingPoll() { clearInterval(writingPollTimer) writingPollTimer = null } + if (writingFetchAbort) { + writingFetchAbort.abort() + writingFetchAbort = null + } + writingFetchInFlight = false } watch( @@ -263,12 +299,16 @@ watch( clearWritingPoll() writingStatus.value = null writingPollError.value = '' + writingFetchSeq = 0 const telemetry = Boolean(nodeType && WRITING_TELEMETRY_TYPES.has(nodeType)) if (!open || !nid || !telemetry) return void fetchWritingTelemetry() - writingPollTimer = setInterval(() => void fetchWritingTelemetry(), 2500) + writingPollTimer = setInterval( + () => void fetchWritingTelemetry(), + WRITING_POLL_INTERVAL_MS, + ) }, - { immediate: true } + { immediate: true }, ) onUnmounted(() => clearWritingPoll()) diff --git a/frontend/src/components/workbench/WorkArea.vue b/frontend/src/components/workbench/WorkArea.vue index 930f5ce3a..41ec0e047 100644 --- a/frontend/src/components/workbench/WorkArea.vue +++ b/frontend/src/components/workbench/WorkArea.vue @@ -1055,6 +1055,9 @@ function maybeEmitDeskRefresh(status: Record | null | undefined } const handleAutopilotStatusChange = (status: any) => { + if (status?.autopilot_status != null) { + lastAssistedAutopilotStatus = String(status.autopilot_status) + } applyAutopilotStatusPayload(status) } @@ -1156,9 +1159,16 @@ let assistedAutopilotPollTimer: ReturnType | null = null /** 该书在库中不存在(404)时不再轮询 /autopilot/.../status */ let assistedAutopilot404 = false let assistAutopilotPollFailures = 0 +/** 最近一次 /status 返回的 autopilot_status,用于停止状态下延长轮询间隔 */ +let lastAssistedAutopilotStatus = '' +/** 组件卸载后禁止再调度 assisted 轮询,防止僵尸轮询累积 */ +let assistedUnmounted = false function assistedAutopilotPollDelayMs(): number { - const base = 4000 + let base = 4000 + if (lastAssistedAutopilotStatus === 'stopped' || lastAssistedAutopilotStatus === 'completed' || lastAssistedAutopilotStatus === 'error') { + base = 30000 + } const mult = Math.min(2 ** Math.min(assistAutopilotPollFailures, 8), 128) return Math.min(base * mult, 60_000) } @@ -1171,6 +1181,7 @@ function clearAssistedAutopilotPoll() { } function scheduleAssistedAutopilotPoll() { + if (assistedUnmounted) return clearAssistedAutopilotPoll() if (assistedAutopilot404 || workMode.value !== 'assisted' || document.hidden) { return @@ -1187,6 +1198,7 @@ function handleVisibilityChange() { clearAssistedAutopilotPoll() } else if (workMode.value === 'assisted') { assistAutopilotPollFailures = 0 + lastAssistedAutopilotStatus = '' void pollAutopilotStatusWhileAssisted().finally(() => scheduleAssistedAutopilotPoll()) } } @@ -1204,6 +1216,7 @@ async function pollAutopilotStatusWhileAssisted() { if (res.ok) { assistAutopilotPollFailures = 0 const json = await res.json() + lastAssistedAutopilotStatus = String(json.autopilot_status ?? '') applyAutopilotStatusPayload(json as Record) } else { assistAutopilotPollFailures += 1 @@ -1229,6 +1242,7 @@ watch( lastAutopilotReactiveFp.value = '' assistedAutopilot404 = false assistAutopilotPollFailures = 0 + lastAssistedAutopilotStatus = '' assistStreamBeatSession.value = null assistStreamFailedChapter.value = null assistStreamPlanFailedChapter.value = null @@ -1248,6 +1262,7 @@ watch( (mode) => { clearAssistedAutopilotPoll() assistAutopilotPollFailures = 0 + lastAssistedAutopilotStatus = '' if (mode === 'assisted') { void pollAutopilotStatusWhileAssisted().finally(() => scheduleAssistedAutopilotPoll()) } @@ -1260,6 +1275,7 @@ onMounted(() => { }) onUnmounted(() => { + assistedUnmounted = true clearAssistedAutopilotPoll() document.removeEventListener('visibilitychange', handleVisibilityChange) if (deskRefreshDebounce) { diff --git a/frontend/src/composables/useDAGSSE.ts b/frontend/src/composables/useDAGSSE.ts index dc2fd17ee..b563efb82 100644 --- a/frontend/src/composables/useDAGSSE.ts +++ b/frontend/src/composables/useDAGSSE.ts @@ -39,6 +39,8 @@ export function useDAGSSE(novelId: Ref) { const runStore = useDAGRunStore() const isDev = import.meta.env.DEV + let isMounted = false + /** DAG 版本变化时重建 type→id,避免每条日志 O(n) 扫描 nodes */ let typeToIdCacheVersion = -1 let typeToIdCache: Map | null = null @@ -156,9 +158,9 @@ export function useDAGSSE(novelId: Ref) { * 智能重连(指数退避) */ function smartReconnect() { + if (!isMounted) return reconnectAttempts++ - // 指数退避 const delay = Math.min( RECONNECT_BASE_DELAY_MS * Math.pow(2, reconnectAttempts - 1), RECONNECT_MAX_DELAY_MS @@ -169,6 +171,7 @@ export function useDAGSSE(novelId: Ref) { } setTimeout(() => { + if (!isMounted) return if (novelId.value) { runStore.connectSSE(novelId.value) runStore.connectAutopilotLog(novelId.value, handleAutopilotLogEvent) @@ -176,25 +179,17 @@ export function useDAGSSE(novelId: Ref) { }, delay) } - // ─── 注册回调(使用优化的批量处理)─── - - runStore.onNodeStatusChange((event) => { - enqueueEvent(event) - }) - - runStore.onNodeOutput((event) => { - enqueueEvent(event) - }) + // ─── 注册回调(存引用以便卸载时注销)─── - runStore.onEdgeFlow((event) => { - enqueueEvent(event) - }) + const _onStatus = (event: NodeEvent) => enqueueEvent(event) + const _onOutput = (event: NodeEvent) => enqueueEvent(event) + const _onEdge = (event: NodeEvent) => enqueueEvent(event) + const _onComplete = () => { flushQueue(); dagStore.resetNodeStates() } - runStore.onRunComplete(() => { - // 立即刷新队列 - flushQueue() - dagStore.resetNodeStates() - }) + runStore.onNodeStatusChange(_onStatus) + runStore.onNodeOutput(_onOutput) + runStore.onEdgeFlow(_onEdge) + runStore.onRunComplete(_onComplete) // SSE 连接状态监控 watch(() => runStore.sseConnected, (connected) => { @@ -218,6 +213,7 @@ export function useDAGSSE(novelId: Ref) { // ─── 生命周期 ─── onMounted(() => { + isMounted = true if (novelId.value) { runStore.connectSSE(novelId.value) runStore.connectAutopilotLog(novelId.value, handleAutopilotLogEvent) @@ -226,15 +222,20 @@ export function useDAGSSE(novelId: Ref) { }) onUnmounted(() => { - // 清理定时器 + isMounted = false + if (throttleTimer) { clearTimeout(throttleTimer) throttleTimer = null } - // 刷新剩余消息 flushQueue() + runStore.offNodeStatusChange(_onStatus) + runStore.offNodeOutput(_onOutput) + runStore.offEdgeFlow(_onEdge) + runStore.offRunComplete(_onComplete) + runStore.disconnectSSE() runStore.disconnectAutopilotLog() diff --git a/frontend/src/stores/dagRunStore.ts b/frontend/src/stores/dagRunStore.ts index 98e079ca1..5dd34e994 100644 --- a/frontend/src/stores/dagRunStore.ts +++ b/frontend/src/stores/dagRunStore.ts @@ -141,12 +141,10 @@ export const useDAGRunStore = defineStore('dagRun', () => { _eventSource.onerror = () => { sseConnected.value = false - // 自动重连 - scheduleReconnect(novelId) + // 重连由 useDAGSSE composable 的 sseConnected watcher 统一管理 } } catch (e: unknown) { sseError.value = e instanceof Error ? e.message : 'SSE 连接失败' - scheduleReconnect(novelId) } } @@ -188,6 +186,11 @@ export const useDAGRunStore = defineStore('dagRun', () => { function onEdgeFlow(cb: SSECallback) { _edgeFlowCallbacks.push(cb) } function onRunComplete(cb: RunCompleteCallback) { _runCompleteCallbacks.push(cb) } + function offNodeStatusChange(cb: SSECallback) { const i = _nodeStatusCallbacks.indexOf(cb); if (i >= 0) _nodeStatusCallbacks.splice(i, 1) } + function offNodeOutput(cb: SSECallback) { const i = _nodeOutputCallbacks.indexOf(cb); if (i >= 0) _nodeOutputCallbacks.splice(i, 1) } + function offEdgeFlow(cb: SSECallback) { const i = _edgeFlowCallbacks.indexOf(cb); if (i >= 0) _edgeFlowCallbacks.splice(i, 1) } + function offRunComplete(cb: RunCompleteCallback) { const i = _runCompleteCallbacks.indexOf(cb); if (i >= 0) _runCompleteCallbacks.splice(i, 1) } + function handleSSEMessage(event: NodeEvent) { // 通用消息分发 switch (event.type) { @@ -331,6 +334,10 @@ export const useDAGRunStore = defineStore('dagRun', () => { onNodeOutput, onEdgeFlow, onRunComplete, + offNodeStatusChange, + offNodeOutput, + offEdgeFlow, + offRunComplete, // Autopilot log bridge connectAutopilotLog, diff --git a/infrastructure/persistence/database/connection.py b/infrastructure/persistence/database/connection.py index 5339209e0..f8cea96df 100644 --- a/infrastructure/persistence/database/connection.py +++ b/infrastructure/persistence/database/connection.py @@ -463,8 +463,11 @@ class DatabaseConnection: - 应用关闭时 close_all() 清理所有线程连接 """ - # WAL checkpoint 阈值:每 N 次写操作触发一次 PRAGMA wal_checkpoint(TRUNCATE) - _WAL_CHECKPOINT_INTERVAL = 20 + # WAL checkpoint 策略(防止 WAL 文件无限增长): + # - 每 N 次写操作触发 PASSIVE checkpoint(非阻塞,不依赖无 reader) + # - 每 M 次写操作尝试 TRUNCATE(需要无 reader,失败也不阻塞写入) + _WAL_CHECKPOINT_PASSIVE_INTERVAL = 20 # PASSIVE:高频、非阻塞 + _WAL_CHECKPOINT_TRUNCATE_INTERVAL = 200 # TRUNCATE:低频、收缩文件 def __init__(self, db_path: str): self.db_path = db_path @@ -681,18 +684,32 @@ def close_all(self, skip_checkpoint: bool = False) -> None: logger.info("All database connections closed (%d, skip_checkpoint=%s)", len(connections), skip_checkpoint) def _maybe_checkpoint(self) -> None: - """定期 WAL checkpoint:每 _WAL_CHECKPOINT_INTERVAL 次写操作触发 TRUNCATE。""" + """定期 WAL checkpoint(PASSIVE + 低频 TRUNCATE)。 + + PASSIVE 模式不阻塞写入者、不要求无 reader,每 20 次写入触发, + 确保 WAL 数据被持续刷入主 DB。TRUNCATE 模式每 200 次写入 + 尝试收缩 WAL 文件(需要无 reader 时才成功,失败不阻塞)。 + """ with self._write_counter_lock: self._write_counter += 1 - if self._write_counter < self._WAL_CHECKPOINT_INTERVAL: - return - self._write_counter = 0 - try: - conn = self.get_connection() - conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") - logger.debug("WAL checkpoint triggered (interval=%d)", self._WAL_CHECKPOINT_INTERVAL) - except Exception as e: - logger.debug("WAL checkpoint skipped: %s", e) + cnt = self._write_counter + + # PASSIVE:高频、非阻塞 + if cnt % self._WAL_CHECKPOINT_PASSIVE_INTERVAL == 0: + try: + conn = self.get_connection() + conn.execute("PRAGMA wal_checkpoint(PASSIVE)") + except Exception as e: + logger.debug("WAL PASSIVE checkpoint skipped: %s", e) + + # TRUNCATE:低频、收缩 WAL 文件 + if cnt % self._WAL_CHECKPOINT_TRUNCATE_INTERVAL == 0: + try: + conn = self.get_connection() + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + logger.debug("WAL TRUNCATE checkpoint succeeded (cnt=%d)", cnt) + except Exception as e: + logger.debug("WAL TRUNCATE checkpoint skipped: %s", e) # 全局数据库实例 diff --git a/infrastructure/persistence/database/query_optimizations.py b/infrastructure/persistence/database/query_optimizations.py index 19d4e4ce1..38bc8fb53 100644 --- a/infrastructure/persistence/database/query_optimizations.py +++ b/infrastructure/persistence/database/query_optimizations.py @@ -247,3 +247,151 @@ def benchmark_query_performance(db_pool, status: str = "running", iterations: in 'new_time_ms': elapsed_new * 1000, 'speedup': speedup, } + + +def list_all_novels_optimized(db) -> List[Novel]: + """批量查询全部小说及其章节、bible 和 outline 状态。 + + 替代原 N+1 模式(1 次 novels + N 次 chapters + N 次 bible + N 次 outline)。 + 优化为 3 次查询: + 1. novels LEFT JOIN chapters(一次性加载所有数据) + 2. 查询有 bible 的 novel_id 集合 + 3. 查询有 outline(ACT 节点)的 novel_id 集合 + + 总查询从 1+3N 降至 3 次。 + """ + # ── 查询 1:小说 + 章节 ── + rows = db.fetch_all( + """SELECT + n.id as novel_id, + n.title, + n.author, + n.target_chapters, + n.premise, + n.autopilot_status, + n.auto_approve_mode, + n.current_stage, + n.current_act, + n.current_chapter_in_act, + n.max_auto_chapters, + n.current_auto_chapters, + n.last_chapter_tension, + n.consecutive_error_count, + n.current_beat_index, + n.beats_completed, + n.last_audit_chapter_number, + n.last_audit_similarity, + n.last_audit_drift_alert, + n.last_audit_narrative_ok, + n.last_audit_at, + n.last_audit_vector_stored, + n.last_audit_foreshadow_stored, + n.last_audit_triples_extracted, + n.last_audit_quality_scores, + n.last_audit_issues, + n.target_words_per_chapter, + n.audit_progress, + n.generation_prefs_json, + c.id as chapter_id, + c.number as chapter_number, + c.title as chapter_title, + c.content as chapter_content, + c.outline as chapter_outline, + c.status as chapter_status, + c.tension_score as chapter_tension_score, + c.plot_tension as chapter_plot_tension, + c.emotional_tension as chapter_emotional_tension, + c.pacing_tension as chapter_pacing_tension + FROM novels n + LEFT JOIN chapters c ON n.id = c.novel_id + ORDER BY n.created_at DESC, c.number ASC""" + ) + + # ── 查询 2:哪些小说有 bible ── + bible_rows = db.fetch_all("SELECT DISTINCT novel_id FROM bibles") + has_bible_set = {r["novel_id"] for r in bible_rows} + + # ── 查询 3:哪些小说有 outline(至少一个 ACT 节点)── + outline_rows = db.fetch_all( + "SELECT DISTINCT novel_id FROM story_nodes WHERE node_type = 'act'" + ) + has_outline_set = {r["novel_id"] for r in outline_rows} + + # ── 组装结果 ── + novels_map: Dict[str, Dict] = {} + + for row in rows: + nid = row["novel_id"] + + if nid not in novels_map: + novels_map[nid] = { + "novel_data": { + "id": nid, + "title": row["title"], + "author": row["author"], + "target_chapters": row["target_chapters"], + "premise": row["premise"], + "autopilot_status": row["autopilot_status"], + "auto_approve_mode": row["auto_approve_mode"], + "current_stage": row["current_stage"], + "current_act": row["current_act"], + "current_chapter_in_act": row["current_chapter_in_act"], + "max_auto_chapters": row["max_auto_chapters"], + "current_auto_chapters": row["current_auto_chapters"], + "last_chapter_tension": row["last_chapter_tension"], + "consecutive_error_count": row["consecutive_error_count"], + "current_beat_index": row["current_beat_index"], + "beats_completed": row["beats_completed"], + "last_audit_chapter_number": row["last_audit_chapter_number"], + "last_audit_similarity": row["last_audit_similarity"], + "last_audit_drift_alert": row["last_audit_drift_alert"], + "last_audit_narrative_ok": row["last_audit_narrative_ok"], + "last_audit_at": row["last_audit_at"], + "last_audit_vector_stored": row["last_audit_vector_stored"], + "last_audit_foreshadow_stored": row["last_audit_foreshadow_stored"], + "last_audit_triples_extracted": row["last_audit_triples_extracted"], + "last_audit_quality_scores": row["last_audit_quality_scores"], + "last_audit_issues": row["last_audit_issues"], + "target_words_per_chapter": row["target_words_per_chapter"], + "audit_progress": row["audit_progress"], + "generation_prefs_json": row.get("generation_prefs_json"), + }, + "chapters": [], + "has_bible": nid in has_bible_set, + "has_outline": nid in has_outline_set, + } + + if row["chapter_id"]: + novels_map[nid]["chapters"].append({ + "id": row["chapter_id"], + "novel_id": nid, + "number": row["chapter_number"], + "title": row["chapter_title"], + "content": row.get("chapter_content", ""), + "outline": row.get("chapter_outline", ""), + "status": row["chapter_status"], + "tension_score": row["chapter_tension_score"], + "plot_tension": row.get("chapter_plot_tension", 50.0), + "emotional_tension": row.get("chapter_emotional_tension", 50.0), + "pacing_tension": row.get("chapter_pacing_tension", 50.0), + }) + + # 构建 Novel 实体 + novels = [] + for nid, data in novels_map.items(): + novel = _build_novel_from_dict(data["novel_data"]) + if data["chapters"]: + novel.chapters = [_build_chapter_from_dict(ch) for ch in data["chapters"]] + # 将 has_bible / has_outline 暂存到私有属性,供 service 层取出 + novel._has_bible = data["has_bible"] + novel._has_outline = data["has_outline"] + novels.append(novel) + + logger.debug( + "list_all 优化查询: %s 本小说, %s 章, %s 有bible, %s 有outline", + len(novels), + sum(len(data["chapters"]) for data in novels_map.values()), + len(has_bible_set), + len(has_outline_set), + ) + return novels diff --git a/interfaces/api/middleware/logging_config.py b/interfaces/api/middleware/logging_config.py index 1e273dea0..92b890244 100644 --- a/interfaces/api/middleware/logging_config.py +++ b/interfaces/api/middleware/logging_config.py @@ -135,7 +135,12 @@ def setup_logging( format_string, datefmt="%Y-%m-%d %H:%M:%S" ) - file_handler = logging.FileHandler(log_file, encoding="utf-8") + from logging.handlers import RotatingFileHandler + file_handler = RotatingFileHandler( + log_file, encoding="utf-8", + maxBytes=10 * 1024 * 1024, # 10MB 自动轮转 + backupCount=3, # 保留最近 3 个备份 + ) file_handler.setLevel(level) file_handler.setFormatter(file_formatter) root_logger.addHandler(file_handler) diff --git a/interfaces/api/stats/services/stats_service.py b/interfaces/api/stats/services/stats_service.py index 25721123e..bf1fd61f2 100644 --- a/interfaces/api/stats/services/stats_service.py +++ b/interfaces/api/stats/services/stats_service.py @@ -1,4 +1,5 @@ """Statistics service layer for business logic.""" +import time as _time from typing import Optional, List, Dict from datetime import datetime import logging @@ -17,6 +18,10 @@ class StatsService: repository layer (data access) and models (data structures). """ + # ── 全局统计 TTL 缓存 ── + _GLOBAL_STATS_CACHE_TTL = 30.0 # 秒 + _global_stats_cache: Optional[Dict] = None # {"data": GlobalStats, "ts": float} + def __init__(self, repository: StatsRepository): """Initialize the service with a repository. @@ -27,20 +32,91 @@ def __init__(self, repository: StatsRepository): logger.info("StatsService initialized") def get_global_stats(self) -> GlobalStats: - """Get global statistics across all books. + """获取全局统计(SQL 聚合 + TTL 缓存)。 - Iterates through all books and aggregates totals: - - Total books count - - Total chapters across all books - - Total word count - - Total character count - - Books categorized by stage + 原实现:N 本小说 × M 章 = N×M 次 content 查询(每次拉全文字段), + 10 本 × 100 章 = 1000+ 次 SQL,极其缓慢。 - Returns: - GlobalStats object with aggregated data + 优化后:用 SQL SUM(LENGTH(content)) 在 DB 内直接聚合, + 1 条查询替代 1000+ 条。TTL 缓存避免高频重复计算。 """ - logger.info("Calculating global statistics") + # 检查 TTL 缓存 + cache = self._global_stats_cache + if cache is not None and _time.time() - cache["ts"] < self._GLOBAL_STATS_CACHE_TTL: + return cache["data"] + + logger.info("Calculating global statistics (optimized SQL aggregation)") + + # ── 1 条 SQL 聚合所有数据 ── + # 从 SqliteStatsRepositoryAdapter 获取原始 db 连接 + db = getattr(self.repository, "db", None) + if db is not None: + stats = self._get_global_stats_via_sql(db) + else: + stats = self._get_global_stats_fallback() + + self._global_stats_cache = {"data": stats, "ts": _time.time()} + logger.info( + "Global stats: %s books, %s chapters, %s chars", + stats.total_books, stats.total_chapters, stats.total_characters, + ) + return stats + def _get_global_stats_via_sql(self, db) -> GlobalStats: + """直接 SQL 聚合(快速路径)。""" + import re as _re + + # 阶段映射(SQL 内用 CASE WHEN 直接算出 public stage) + # 复用与 SqliteStatsRepositoryAdapter._public_stage_from_row 相同的逻辑 + row = db.fetch_one(""" + SELECT + COUNT(DISTINCT n.id) AS total_books, + COUNT(c.id) AS total_chapters, + COALESCE(SUM(LENGTH(c.content)), 0) AS total_characters, + COALESCE(SUM(CASE WHEN c.content IS NOT NULL + AND c.content != '' THEN 1 ELSE 0 END), 0) AS chapters_with_content + FROM novels n + LEFT JOIN chapters c ON n.id = c.novel_id + """) + + total_books = row["total_books"] if row else 0 + total_chapters = row["total_chapters"] if row else 0 + total_characters = row["total_characters"] if row else 0 + chapters_with_content = row["chapters_with_content"] if row else 0 + + # 估算总字数:中文为主时 ≈ 字符数;有英文时需加回英文单词 + # 作为近似,用 总字符数 作为 total_words(对中文写作已是准确值) + total_words = total_characters + + # 按阶段分组 + stage_rows = db.fetch_all(""" + SELECT current_stage, COUNT(*) AS c + FROM novels + GROUP BY current_stage + """) + + stage_map = { + "planning": "planning", "macro_planning": "planning", + "act_planning": "planning", "writing": "writing", + "auditing": "reviewing", "reviewing": "reviewing", + "paused_for_review": "reviewing", "completed": "completed", + } + books_by_stage: Dict[str, int] = {} + for sr in stage_rows: + raw = sr["current_stage"] or "planning" + public = stage_map.get(raw, raw) + books_by_stage[public] = books_by_stage.get(public, 0) + sr["c"] + + return GlobalStats( + total_books=total_books, + total_chapters=total_chapters, + total_words=total_words, + total_characters=total_characters, + books_by_stage=books_by_stage, + ) + + def _get_global_stats_fallback(self) -> GlobalStats: + """降级路径:通过 repository 接口逐 novel 聚合(无 db 直连时使用)。""" book_slugs = self.repository.get_all_book_slugs() total_books = len(book_slugs) total_chapters = 0 @@ -57,28 +133,22 @@ def get_global_stats(self) -> GlobalStats: outline = self.repository.get_book_outline(slug) if outline and "chapters" in outline: total_chapters += len(outline["chapters"]) - - # Calculate words and characters for this book for chapter_info in outline["chapters"]: chapter_id = chapter_info.get("id") if chapter_id: content = self.repository.get_chapter_content(slug, chapter_id) if content: - word_count = self.repository.count_words(content) - total_words += word_count + total_words += self.repository.count_words(content) total_characters += len(content) - stats = GlobalStats( + return GlobalStats( total_books=total_books, total_chapters=total_chapters, total_words=total_words, total_characters=total_characters, - books_by_stage=books_by_stage + books_by_stage=books_by_stage, ) - logger.info(f"Global stats: {total_books} books, {total_chapters} chapters, {total_words} words") - return stats - def get_book_stats(self, slug: str) -> Optional[BookStats]: """Get statistics for a specific book. diff --git a/interfaces/api/v1/workbench/llm_control.py b/interfaces/api/v1/workbench/llm_control.py index e3db1ca48..9a40cbc0d 100644 --- a/interfaces/api/v1/workbench/llm_control.py +++ b/interfaces/api/v1/workbench/llm_control.py @@ -2,6 +2,7 @@ import json import logging +import time import uuid from typing import Any, Dict, List, Optional from urllib.parse import urlparse, urlunparse @@ -309,12 +310,26 @@ async def plaza_init() -> Dict[str, Any]: return result +# ── 提示词统计 TTL 缓存 ── +_prompt_stats_cache: Optional[Dict[str, Any]] = None +_prompt_stats_cache_ts: float = 0.0 +_PROMPT_STATS_CACHE_TTL: float = 30.0 # 秒 + + @router.get('/prompts/stats') async def get_prompt_stats() -> Dict[str, Any]: - """获取提示词库统计信息。""" + """获取提示词库统计信息(带 TTL 缓存)。""" + global _prompt_stats_cache, _prompt_stats_cache_ts + now = time.time() + if _prompt_stats_cache is not None and (now - _prompt_stats_cache_ts) < _PROMPT_STATS_CACHE_TTL: + return _prompt_stats_cache + mgr = get_prompt_manager() mgr.ensure_seeded() - return mgr.get_stats() + stats = mgr.get_stats() + _prompt_stats_cache = stats + _prompt_stats_cache_ts = now + return stats @router.get('/prompts/categories-info') diff --git a/interfaces/main.py b/interfaces/main.py index 0f5b94feb..f9dc7c4f3 100644 --- a/interfaces/main.py +++ b/interfaces/main.py @@ -3,6 +3,7 @@ 提供 RESTful API 接口。 """ # 必须在任何 HuggingFace/Transformers 导入前设置离线模式 +from __future__ import annotations import os os.environ['HF_HUB_OFFLINE'] = '1' os.environ['TRANSFORMERS_OFFLINE'] = '1' @@ -177,6 +178,9 @@ async def startup_event(): with startup_sqlite_writes_bypass_queue(): _stop_all_running_novels() + # 清理上次运行时残留的 WAL(PASSIVE 模式不阻塞,安全执行) + _checkpoint_sqlite_wal_safe() + _bootstrap_persistence_consumer_early() # AOF 崩溃恢复:扫描残留的 .draft 文件,恢复到 DB