From 5b3ca3c9120ad6b23ec7df363aee21f2f05599f0 Mon Sep 17 00:00:00 2001 From: imleon Date: Mon, 23 Mar 2026 01:07:29 +0800 Subject: [PATCH] fix(openclaw-plugin): run afterTurn auto-capture in background queue Move OpenViking auto-capture out of the synchronous afterTurn path so message delivery is not blocked by extraction latency. Keep capture behavior/logging intact and process capture jobs sequentially per session via an in-memory queue to avoid concurrent session contention. Co-Authored-By: Claude Opus 4.6 --- examples/openclaw-plugin/context-engine.ts | 170 +++++++++++++-------- 1 file changed, 103 insertions(+), 67 deletions(-) diff --git a/examples/openclaw-plugin/context-engine.ts b/examples/openclaw-plugin/context-engine.ts index cae34e694..763053657 100644 --- a/examples/openclaw-plugin/context-engine.ts +++ b/examples/openclaw-plugin/context-engine.ts @@ -52,6 +52,7 @@ type ContextEngine = { }) => Promise; afterTurn?: (params: { sessionId: string; + sessionKey?: string; sessionFile: string; messages: AgentMessage[]; prePromptMessageCount: number; @@ -156,6 +157,102 @@ export function createMemoryOpenVikingContextEngine(params: { return client; }; + type AfterTurnCaptureParams = { + sessionId: string; + sessionKey?: string; + messages: AgentMessage[]; + prePromptMessageCount?: number; + }; + + const captureQueueBySession = new Map>(); + + const runAutoCapture = async (afterTurnParams: AfterTurnCaptureParams): Promise => { + try { + await switchClientAgent(afterTurnParams.sessionId, "afterTurn"); + + const messages = afterTurnParams.messages ?? []; + if (messages.length === 0) { + logger.info("openviking: auto-capture skipped (messages=0)"); + return; + } + + const start = + typeof afterTurnParams.prePromptMessageCount === "number" && + afterTurnParams.prePromptMessageCount >= 0 + ? afterTurnParams.prePromptMessageCount + : 0; + + const { texts: newTexts, newCount } = extractNewTurnTexts(messages, start); + + if (newTexts.length === 0) { + logger.info("openviking: auto-capture skipped (no new user/assistant messages)"); + return; + } + + const turnText = newTexts.join("\n"); + const decision = getCaptureDecision(turnText, cfg.captureMode, cfg.captureMaxLength); + const preview = turnText.length > 80 ? `${turnText.slice(0, 80)}...` : turnText; + logger.info( + "openviking: capture-check " + + `shouldCapture=${String(decision.shouldCapture)} ` + + `reason=${decision.reason} newMsgCount=${newCount} text=\"${preview}\"`, + ); + + if (!decision.shouldCapture) { + logger.info("openviking: auto-capture skipped (capture decision rejected)"); + return; + } + + const client = await getClient(); + const sessionId = await client.createSession(); + try { + await client.addSessionMessage(sessionId, "user", decision.normalizedText); + await client.getSession(sessionId).catch(() => ({})); + const extracted = await client.extractSessionMemories(sessionId); + + logger.info( + `openviking: auto-captured ${newCount} new messages, extracted ${extracted.length} memories`, + ); + logger.info( + `openviking: capture-detail ${toJsonLog({ + capturedCount: newCount, + captured: [trimForLog(turnText, 260)], + extractedCount: extracted.length, + extracted: summarizeExtractedMemories(extracted), + })}`, + ); + if (extracted.length === 0) { + warnOrInfo( + logger, + "openviking: auto-capture completed but extract returned 0 memories. " + + "Check OpenViking server logs for embedding/extract errors.", + ); + } + } finally { + await client.deleteSession(sessionId).catch(() => {}); + } + } catch (err) { + warnOrInfo(logger, `openviking: auto-capture failed: ${String(err)}`); + } + }; + + const enqueueAutoCapture = (afterTurnParams: AfterTurnCaptureParams): void => { + const queueKey = afterTurnParams.sessionKey || afterTurnParams.sessionId; + const previous = captureQueueBySession.get(queueKey) ?? Promise.resolve(); + const next = previous + .catch(() => {}) + .then(() => runAutoCapture(afterTurnParams)) + .catch((err) => { + warnOrInfo(logger, `openviking: queued auto-capture failed: ${String(err)}`); + }) + .finally(() => { + if (captureQueueBySession.get(queueKey) === next) { + captureQueueBySession.delete(queueKey); + } + }); + captureQueueBySession.set(queueKey, next); + }; + return { info: { id, @@ -185,73 +282,12 @@ export function createMemoryOpenVikingContextEngine(params: { return; } - try { - await switchClientAgent(afterTurnParams.sessionId, "afterTurn"); - - const messages = afterTurnParams.messages ?? []; - if (messages.length === 0) { - logger.info("openviking: auto-capture skipped (messages=0)"); - return; - } - - const start = - typeof afterTurnParams.prePromptMessageCount === "number" && - afterTurnParams.prePromptMessageCount >= 0 - ? afterTurnParams.prePromptMessageCount - : 0; - - const { texts: newTexts, newCount } = extractNewTurnTexts(messages, start); - - if (newTexts.length === 0) { - logger.info("openviking: auto-capture skipped (no new user/assistant messages)"); - return; - } - - const turnText = newTexts.join("\n"); - const decision = getCaptureDecision(turnText, cfg.captureMode, cfg.captureMaxLength); - const preview = turnText.length > 80 ? `${turnText.slice(0, 80)}...` : turnText; - logger.info( - "openviking: capture-check " + - `shouldCapture=${String(decision.shouldCapture)} ` + - `reason=${decision.reason} newMsgCount=${newCount} text=\"${preview}\"`, - ); - - if (!decision.shouldCapture) { - logger.info("openviking: auto-capture skipped (capture decision rejected)"); - return; - } - - const client = await getClient(); - const sessionId = await client.createSession(); - try { - await client.addSessionMessage(sessionId, "user", decision.normalizedText); - await client.getSession(sessionId).catch(() => ({})); - const extracted = await client.extractSessionMemories(sessionId); - - logger.info( - `openviking: auto-captured ${newCount} new messages, extracted ${extracted.length} memories`, - ); - logger.info( - `openviking: capture-detail ${toJsonLog({ - capturedCount: newCount, - captured: [trimForLog(turnText, 260)], - extractedCount: extracted.length, - extracted: summarizeExtractedMemories(extracted), - })}`, - ); - if (extracted.length === 0) { - warnOrInfo( - logger, - "openviking: auto-capture completed but extract returned 0 memories. " + - "Check OpenViking server logs for embedding/extract errors.", - ); - } - } finally { - await client.deleteSession(sessionId).catch(() => {}); - } - } catch (err) { - warnOrInfo(logger, `openviking: auto-capture failed: ${String(err)}`); - } + enqueueAutoCapture({ + sessionId: afterTurnParams.sessionId, + sessionKey: afterTurnParams.sessionKey, + messages: afterTurnParams.messages ?? [], + prePromptMessageCount: afterTurnParams.prePromptMessageCount, + }); }, async compact(compactParams): Promise {