From 91dabd922b9257d50ddfefa08907be325d409637 Mon Sep 17 00:00:00 2001 From: Rockford Lhotka Date: Tue, 5 May 2026 16:26:07 -0500 Subject: [PATCH 1/2] Honor preemption token in DreamService LLM calls (#333) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidate 15 near-identical LLM call sites in DreamService into a single InvokeDreamPassAsync helper that builds messages, calls the Balanced-tier LLM with the slot's cancellation token, extracts JSON, logs warnings on parse failure, and deserializes to the target DTO. Previously 14 of 15 sites omitted the slot.Token, so when a user message preempted an in-flight dream cycle the cooperative cancel never reached the LLM call — observed user-perceived latency of 3 min 36 s in production. Threading the token through one helper plugs all sites and prevents future passes from reintroducing the leak. Net -179 lines; behavior preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/RockBot.Host/DreamService.cs | 491 ++++++++++--------------------- 1 file changed, 156 insertions(+), 335 deletions(-) diff --git a/src/RockBot.Host/DreamService.cs b/src/RockBot.Host/DreamService.cs index a21ee02..eb24b6e 100644 --- a/src/RockBot.Host/DreamService.cs +++ b/src/RockBot.Host/DreamService.cs @@ -432,31 +432,12 @@ private async Task DreamAsync() userMessage.AppendLine($" {e.Content}"); } - var messages = new List - { - new(ChatRole.System, _dreamDirective!), - new(ChatRole.User, userMessage.ToString()) - }; - - ct.ThrowIfCancellationRequested(); - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }, ct); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: LLM returned no parseable JSON object; skipping cycle"); - return; - } - - _logger.LogDebug("DreamService: LLM response JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "memory dream"); - if (result is null) - { - _logger.LogWarning("DreamService: failed to deserialize dream result; skipping cycle"); - return; - } + var result = await InvokeDreamPassAsync( + "memory dream", + _dreamDirective!, + userMessage.ToString(), + ct); + if (result is null) return; var deleted = 0; var saved = 0; @@ -531,32 +512,32 @@ private async Task DreamAsync() } if (_skillStore is not null) - { ct.ThrowIfCancellationRequested(); await RunSkillGapDetectionPassAsync(); } + { ct.ThrowIfCancellationRequested(); await RunSkillGapDetectionPassAsync(ct); } if (_skillStore is not null) - { ct.ThrowIfCancellationRequested(); await ConsolidateSkillsAsync(); } + { ct.ThrowIfCancellationRequested(); await ConsolidateSkillsAsync(ct); } - ct.ThrowIfCancellationRequested(); await RunEpisodeExtractionPassAsync(); + ct.ThrowIfCancellationRequested(); await RunEpisodeExtractionPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunEntityExtractionPassAsync(); + ct.ThrowIfCancellationRequested(); await RunEntityExtractionPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunGraphConsolidationPassAsync(); + ct.ThrowIfCancellationRequested(); await RunGraphConsolidationPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunMemoryMiningPassAsync(); + ct.ThrowIfCancellationRequested(); await RunMemoryMiningPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunPreferenceInferencePassAsync(); + ct.ThrowIfCancellationRequested(); await RunPreferenceInferencePassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunSequenceSkillDetectionPassAsync(); + ct.ThrowIfCancellationRequested(); await RunSequenceSkillDetectionPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunWispFailureAnalysisPassAsync(); + ct.ThrowIfCancellationRequested(); await RunWispFailureAnalysisPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunToolSuccessLearningPassAsync(); + ct.ThrowIfCancellationRequested(); await RunToolSuccessLearningPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunTierRoutingReviewPassAsync(); + ct.ThrowIfCancellationRequested(); await RunTierRoutingReviewPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunDlqReviewPassAsync(); + ct.ThrowIfCancellationRequested(); await RunDlqReviewPassAsync(ct); - ct.ThrowIfCancellationRequested(); await RunIdentityReflectionPassAsync(); + ct.ThrowIfCancellationRequested(); await RunIdentityReflectionPassAsync(ct); sw.Stop(); _logger.LogInformation( @@ -577,7 +558,7 @@ private async Task DreamAsync() } } - private async Task ConsolidateSkillsAsync() + private async Task ConsolidateSkillsAsync(CancellationToken ct) { var all = await _skillStore!.ListAsync(); @@ -709,30 +690,12 @@ private async Task ConsolidateSkillsAsync() } } - var messages = new List - { - new(ChatRole.System, _skillDreamDirective!), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: skill LLM returned no parseable JSON; skipping skill consolidation"); - return; - } - - _logger.LogDebug("DreamService: skill LLM response JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "skill consolidation"); - if (result is null) - { - _logger.LogWarning("DreamService: failed to deserialize skill dream result; skipping"); - return; - } + var result = await InvokeDreamPassAsync( + "skill consolidation", + _skillDreamDirective!, + userMessage.ToString(), + ct); + if (result is null) return; var deleted = 0; var saved = 0; @@ -827,7 +790,7 @@ private async Task ConsolidateSkillsAsync() skill.Name, skill.SeeAlso is { Count: > 0 } ? string.Join(", ", skill.SeeAlso) : "none"); } - await OptimizeSkillsAsync(); + await OptimizeSkillsAsync(ct); _logger.LogInformation( "DreamService: skill consolidation complete — {Deleted} deleted, {Saved} saved", @@ -838,7 +801,7 @@ private async Task ConsolidateSkillsAsync() /// Identifies skills associated with poor-quality sessions and asks the LLM to improve them. /// Skipped when the skill usage store or feedback store is unavailable, or no at-risk skills are found. /// - private async Task OptimizeSkillsAsync() + private async Task OptimizeSkillsAsync(CancellationToken ct) { if (_skillUsageStore is null || _feedbackStore is null || _skillOptimizeDirective is null) return; @@ -1006,28 +969,12 @@ private async Task OptimizeSkillsAsync() } } - var messages = new List - { - new(ChatRole.System, _skillOptimizeDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: skill optimize LLM returned no parseable JSON; skipping optimization"); - return; - } - - var result = TryDeserializeJson(json, "skill optimization"); - if (result is null) - { - _logger.LogWarning("DreamService: failed to deserialize skill optimize result; skipping"); - return; - } + var result = await InvokeDreamPassAsync( + "skill optimization", + _skillOptimizeDirective, + userMessage.ToString(), + ct); + if (result is null) return; var deleted = 0; var saved = 0; @@ -1095,7 +1042,7 @@ private async Task OptimizeSkillsAsync() /// Runs before skill consolidation so that the consolidation pass can deduplicate /// any new skills alongside existing ones. /// - private async Task RunSkillGapDetectionPassAsync() + private async Task RunSkillGapDetectionPassAsync(CancellationToken ct) { if (_conversationLog is null || _skillStore is null || !_options.SkillGapEnabled) return; @@ -1245,25 +1192,12 @@ private async Task RunSkillGapDetectionPassAsync() } } - var messages = new List - { - new(ChatRole.System, _skillGapDirective ?? BuiltInSkillGapDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: skill gap LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: skill gap JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "skill gap"); + var result = await InvokeDreamPassAsync( + "skill gap", + _skillGapDirective ?? BuiltInSkillGapDirective, + userMessage.ToString(), + ct); + if (result is null) return; var saved = 0; foreach (var dto in result?.ToSave ?? []) @@ -1422,7 +1356,7 @@ internal async Task RunImportanceDecayPassAsync(IReadOnlyList entri /// Runs before memory mining so episodes exist before facts are distilled. /// Does NOT clear the log — that is deferred to . /// - private async Task RunEpisodeExtractionPassAsync() + private async Task RunEpisodeExtractionPassAsync(CancellationToken ct) { if (_conversationLog is null || !_options.EpisodeExtractionEnabled) return; @@ -1471,26 +1405,12 @@ private async Task RunEpisodeExtractionPassAsync() userMessage.AppendLine(); } - var messages = new List - { - new(ChatRole.System, _episodeDirective ?? BuiltInEpisodeDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, - new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: episode extraction LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: episode extraction JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "episode extraction"); + var result = await InvokeDreamPassAsync( + "episode extraction", + _episodeDirective ?? BuiltInEpisodeDirective, + userMessage.ToString(), + ct); + if (result is null) return; var created = 0; var reinforced = 0; @@ -1589,7 +1509,7 @@ private async Task RunEpisodeExtractionPassAsync() /// Extracts entities and relationships from episodic memories and conversation logs, /// populating the knowledge graph triple store for relational reasoning. /// - private async Task RunEntityExtractionPassAsync() + private async Task RunEntityExtractionPassAsync(CancellationToken ct) { if (_knowledgeGraph is null || !_options.EntityExtractionEnabled) return; @@ -1642,26 +1562,12 @@ private async Task RunEntityExtractionPassAsync() userMessage.AppendLine(); } - var messages = new List - { - new(ChatRole.System, _entityExtractionDirective ?? BuiltInEntityExtractionDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, - new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: entity extraction LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: entity extraction JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "entity extraction"); + var result = await InvokeDreamPassAsync( + "entity extraction", + _entityExtractionDirective ?? BuiltInEntityExtractionDirective, + userMessage.ToString(), + ct); + if (result is null) return; var entitiesCreated = 0; var triplesCreated = 0; @@ -1725,7 +1631,7 @@ private async Task RunEntityExtractionPassAsync() /// LLM to decide what to delete or merge. Runs after entity extraction so newly created /// entities are included in the review. /// - private async Task RunGraphConsolidationPassAsync() + private async Task RunGraphConsolidationPassAsync(CancellationToken ct) { if (_knowledgeGraph is null || !_options.GraphConsolidationEnabled) return; @@ -1774,26 +1680,12 @@ private async Task RunGraphConsolidationPassAsync() $"- [{t.Id}] {t.Subject} --{t.Predicate}--> {t.Object} (confidence={t.Confidence:F2}, created={t.CreatedAt:yyyy-MM-dd}{source})"); } - var messages = new List - { - new(ChatRole.System, _graphConsolidationDirective ?? BuiltInGraphConsolidationDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, - new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: graph consolidation LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: graph consolidation JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "graph consolidation"); + var result = await InvokeDreamPassAsync( + "graph consolidation", + _graphConsolidationDirective ?? BuiltInGraphConsolidationDirective, + userMessage.ToString(), + ct); + if (result is null) return; var entitiesDeleted = 0; var triplesDeleted = 0; @@ -1829,7 +1721,7 @@ private async Task RunGraphConsolidationPassAsync() /// (which targets behavioral patterns) and skill gap detection (which targets procedures). /// Does NOT clear the log — that is deferred to . /// - private async Task RunMemoryMiningPassAsync() + private async Task RunMemoryMiningPassAsync(CancellationToken ct) { if (_conversationLog is null || !_options.MemoryMiningEnabled) return; @@ -1863,26 +1755,12 @@ private async Task RunMemoryMiningPassAsync() await AppendSubagentWhiteboardEntriesAsync(userMessage); - var messages = new List - { - new(ChatRole.System, _memoryMiningDirective ?? BuiltInMemoryMiningDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, - new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: memory mining LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: memory mining JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "memory mining"); + var result = await InvokeDreamPassAsync( + "memory mining", + _memoryMiningDirective ?? BuiltInMemoryMiningDirective, + userMessage.ToString(), + ct); + if (result is null) return; var saved = 0; foreach (var dto in result?.ToSave ?? []) @@ -2216,7 +2094,7 @@ private static string Truncate(string s, int max) => /// memory entries tagged "verified" and "tool-success-learned" so future sessions surface /// them via BM25 or vector recall before the agent has to re-discover the same answer. /// - private async Task RunToolSuccessLearningPassAsync() + private async Task RunToolSuccessLearningPassAsync(CancellationToken ct) { if (!_options.ToolSuccessLearningEnabled || _toolCallLog is null) return; @@ -2239,24 +2117,11 @@ private async Task RunToolSuccessLearningPassAsync() try { - var messages = new List - { - new(ChatRole.System, _toolSuccessLearningDirective ?? BuiltInToolSuccessLearningDirective), - new(ChatRole.User, userMessage) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, - new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: tool-success-learning LLM returned no parseable JSON; skipping"); - return; - } - - var result = TryDeserializeJson(json, "tool-success-learning"); + var result = await InvokeDreamPassAsync( + "tool-success-learning", + _toolSuccessLearningDirective ?? BuiltInToolSuccessLearningDirective, + userMessage, + ct); var entries = NormalizeToolSuccessLearningEntries( result, idFactory: () => Guid.NewGuid().ToString("N")[..12], @@ -2313,7 +2178,7 @@ private async Task AppendSubagentWhiteboardEntriesAsync(StringBuilder userMessag /// and saves inferred preferences as tagged memory entries. /// Always clears the log after the pass to prevent unbounded growth. /// - private async Task RunPreferenceInferencePassAsync() + private async Task RunPreferenceInferencePassAsync(CancellationToken ct) { if (_conversationLog is null || !_options.PreferenceInferenceEnabled) return; @@ -2363,28 +2228,16 @@ private async Task RunPreferenceInferencePassAsync() } } - var messages = new List - { - new(ChatRole.System, _prefDreamDirective ?? BuiltInPrefDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: preference inference LLM returned no parseable JSON; skipping"); - } - else + var result = await InvokeDreamPassAsync( + "preference inference", + _prefDreamDirective ?? BuiltInPrefDirective, + userMessage.ToString(), + ct); + if (result is not null) { - _logger.LogDebug("DreamService: pref inference JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "preference inference"); var saved = 0; - foreach (var dto in result?.ToSave ?? []) + foreach (var dto in result.ToSave ?? []) { if (string.IsNullOrWhiteSpace(dto.Content)) continue; @@ -2435,7 +2288,7 @@ private async Task RunPreferenceInferencePassAsync() /// Reviews recent tier-routing decisions and writes an updated tier-selector.json /// when the LLM detects systematic mis-routing. Skipped when fewer than 10 entries exist. /// - private async Task RunTierRoutingReviewPassAsync() + private async Task RunTierRoutingReviewPassAsync(CancellationToken ct) { if (_tierRoutingLogger is null || !_options.TierRoutingReviewEnabled) return; @@ -2492,26 +2345,11 @@ private async Task RunTierRoutingReviewPassAsync() userMessage.AppendLine(await File.ReadAllTextAsync(configPath)); } - var messages = new List - { - new(ChatRole.System, _tierRoutingDirective ?? BuiltInTierRoutingDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync( - messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: tier routing review LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: tier routing review JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "tier routing review"); + var result = await InvokeDreamPassAsync( + "tier routing review", + _tierRoutingDirective ?? BuiltInTierRoutingDirective, + userMessage.ToString(), + ct); if (result is null) return; // Save anti-pattern entries regardless of whether the config changed @@ -2582,7 +2420,7 @@ You are a procedural skill synthesis assistant. Analyze the tool-call sequences /// and synthesize them into reusable skills. Requires and /// to be available. /// - private async Task RunSequenceSkillDetectionPassAsync() + private async Task RunSequenceSkillDetectionPassAsync(CancellationToken ct) { if (_toolCallLog is null || _skillStore is null || !_options.SequenceSkillDetectionEnabled) return; @@ -2644,26 +2482,12 @@ private async Task RunSequenceSkillDetectionPassAsync() userMessage.AppendLine($" - {name}"); } - var messages = new List - { - new(ChatRole.System, _sequenceSkillDirective ?? BuiltInSequenceSkillDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync( - messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: sequence skill detection LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: sequence skill detection JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "sequence skill detection"); + var result = await InvokeDreamPassAsync( + "sequence skill detection", + _sequenceSkillDirective ?? BuiltInSequenceSkillDirective, + userMessage.ToString(), + ct); + if (result is null) return; var created = 0; foreach (var dto in result?.ToSave ?? []) @@ -2745,7 +2569,7 @@ Wisps are lightweight multi-step pipelines with tool invocations. Each record sh /// Analyzes wisp execution records to detect recurring failure patterns and propose /// skill corrections. Requires and . /// - private async Task RunWispFailureAnalysisPassAsync() + private async Task RunWispFailureAnalysisPassAsync(CancellationToken ct) { if (_wispExecutionLog is null || _skillStore is null || !_options.WispFailureAnalysisEnabled) return; @@ -2805,26 +2629,12 @@ private async Task RunWispFailureAnalysisPassAsync() userMessage.AppendLine($" - {skill.Name}: {skill.Summary}"); } - var messages = new List - { - new(ChatRole.System, _wispFailureDirective ?? BuiltInWispFailureDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync( - messages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: wisp failure analysis LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: wisp failure analysis JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "wisp failure analysis"); + var result = await InvokeDreamPassAsync( + "wisp failure analysis", + _wispFailureDirective ?? BuiltInWispFailureDirective, + userMessage.ToString(), + ct); + if (result is null) return; var updated = 0; // Apply skill updates @@ -2913,7 +2723,7 @@ private sealed record WispPromotionCandidateDto /// saves patterns as memory entries, and purges queues the LLM deems safe to clear. /// Skipped when the DLQ sampler is unavailable or is false. /// - private async Task RunDlqReviewPassAsync() + private async Task RunDlqReviewPassAsync(CancellationToken ct) { if (_dlqSampler is null || !_options.DlqReviewEnabled) return; @@ -2978,26 +2788,11 @@ private async Task RunDlqReviewPassAsync() } } - var chatMessages = new List - { - new(ChatRole.System, _dlqDirective ?? BuiltInDlqDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync( - chatMessages, ModelTier.Balanced, new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: DLQ review LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: DLQ review JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "DLQ review"); + var result = await InvokeDreamPassAsync( + "DLQ review", + _dlqDirective ?? BuiltInDlqDirective, + userMessage.ToString(), + ct); if (result is null) return; if (result.NoDlqIssues == true) @@ -3062,7 +2857,7 @@ private async Task RunDlqReviewPassAsync() /// under the agent-identity/ memory category. These entries complement the immutable /// soul.md — the pass cannot override core values or boundaries. /// - private async Task RunIdentityReflectionPassAsync() + private async Task RunIdentityReflectionPassAsync(CancellationToken ct) { if (!_options.IdentityReflectionEnabled) return; @@ -3143,26 +2938,11 @@ private async Task RunIdentityReflectionPassAsync() userMessage.AppendLine(); } - var messages = new List - { - new(ChatRole.System, _identityDirective ?? BuiltInIdentityDirective), - new(ChatRole.User, userMessage.ToString()) - }; - - var response = await _llmClient.GetResponseAsync(messages, ModelTier.Balanced, - new ChatOptions { ResponseFormat = ChatResponseFormat.Json }); - var raw = response.Text?.Trim() ?? string.Empty; - var json = ExtractJsonObject(raw); - - if (string.IsNullOrEmpty(json)) - { - _logger.LogWarning("DreamService: identity reflection LLM returned no parseable JSON; skipping"); - return; - } - - _logger.LogDebug("DreamService: identity reflection JSON ({Length} chars): {Json}", json.Length, json); - - var result = TryDeserializeJson(json, "identity reflection"); + var result = await InvokeDreamPassAsync( + "identity reflection", + _identityDirective ?? BuiltInIdentityDirective, + userMessage.ToString(), + ct); if (result is null) return; if (result.NoChange == true) @@ -3224,6 +3004,47 @@ private async Task RunIdentityReflectionPassAsync() } } + /// + /// Runs a single dream pass: builds a System+User chat message pair, calls the + /// Balanced-tier LLM in JSON-response mode with the supplied cancellation token, + /// extracts the outermost JSON object from the response, and deserializes it. + /// Returns null if the LLM produced no parseable JSON or the JSON failed + /// to deserialize into ; in both cases the helper + /// has already logged a warning. The cancellation token MUST be the slot token + /// () so that user preemption interrupts + /// the in-flight LLM call promptly — see issue #333. + /// + private async Task InvokeDreamPassAsync( + string passName, + string systemDirective, + string userMessage, + CancellationToken ct) + where TResult : class + { + var messages = new List + { + new(ChatRole.System, systemDirective), + new(ChatRole.User, userMessage) + }; + + var response = await _llmClient.GetResponseAsync( + messages, + ModelTier.Balanced, + new ChatOptions { ResponseFormat = ChatResponseFormat.Json }, + ct); + + var raw = response.Text?.Trim() ?? string.Empty; + var json = ExtractJsonObject(raw); + if (string.IsNullOrEmpty(json)) + { + _logger.LogWarning("DreamService: {Pass} LLM returned no parseable JSON; skipping", passName); + return null; + } + + _logger.LogDebug("DreamService: {Pass} JSON ({Length} chars): {Json}", passName, json.Length, json); + return TryDeserializeJson(json, passName); + } + /// /// Extracts the outermost JSON object from , tolerating /// DeepSeek-style thinking blocks and prose preamble. From 2cec45f90b0baf2d5034aa781ea6d851decd2ca7 Mon Sep 17 00:00:00 2001 From: Rockford Lhotka Date: Tue, 5 May 2026 16:51:38 -0500 Subject: [PATCH 2/2] Wrap dream-pass try/catch in RunPassAsync helper (#333) Adds RunPassAsync(passName, body) which runs the supplied body inside a try/catch that rethrows OperationCanceledException and converts other exceptions into a per-pass error log. The 10 dream-pass methods that previously hand-rolled the same try/catch wrapper now call into this helper, eliminating ~80 lines of duplicated boilerplate. This also fixes a noisy log line surfaced during k8s validation: when preemption fired, each pass's catch (Exception ex) was catching the OperationCanceledException and logging it as " pass failed" with a stack trace before the OCE rethrew at the next ThrowIfCancellationRequested between passes. The helper has an explicit catch (OperationCanceledException) that rethrows, so OCE flows cleanly to DreamAsync's outer handler and only the single "dream cycle preempted by user request" line is logged. Also bumps Directory.Build.props from 0.10.28 to 0.10.29 for the deploy used to validate the original fix. Co-Authored-By: Claude Opus 4.7 (1M context) --- Directory.Build.props | 2 +- src/RockBot.Host/DreamService.cs | 93 ++++++++++++++------------------ 2 files changed, 40 insertions(+), 55 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index 07763de..ef3b8be 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -9,7 +9,7 @@ -0.10.28 +0.10.29 diff --git a/src/RockBot.Host/DreamService.cs b/src/RockBot.Host/DreamService.cs index eb24b6e..1d2432c 100644 --- a/src/RockBot.Host/DreamService.cs +++ b/src/RockBot.Host/DreamService.cs @@ -1370,7 +1370,7 @@ private async Task RunEpisodeExtractionPassAsync(CancellationToken ct) _logger.LogInformation("DreamService: episode extraction pass — {Count} log entries to analyze", entries.Count); - try + await RunPassAsync("episode extraction", async () => { // Fetch existing episodic memories so the LLM can reinforce them var existingEpisodes = await _memory.SearchAsync( @@ -1498,11 +1498,7 @@ private async Task RunEpisodeExtractionPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: episode extraction pass complete — {Created} created, {Reinforced} reinforced", created, reinforced); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: episode extraction pass failed"); - } + }); } /// @@ -1529,7 +1525,7 @@ private async Task RunEntityExtractionPassAsync(CancellationToken ct) _logger.LogInformation("DreamService: entity extraction pass — {Count} log entries to analyze", entries.Count); - try + await RunPassAsync("entity extraction", async () => { // Provide existing entities so the LLM can reference/update them var existingEntities = await _knowledgeGraph.ListEntitiesAsync(); @@ -1619,11 +1615,7 @@ private async Task RunEntityExtractionPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: entity extraction pass complete — {Entities} entities, {Triples} triples created", entitiesCreated, triplesCreated); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: entity extraction pass failed"); - } + }); } /// @@ -1649,7 +1641,7 @@ private async Task RunGraphConsolidationPassAsync(CancellationToken ct) "DreamService: graph consolidation pass — {Entities} entities, {Triples} triples to review", entities.Count, triples.Count); - try + await RunPassAsync("graph consolidation", async () => { var now = _clock.Now; var userMessage = new StringBuilder(); @@ -1708,11 +1700,7 @@ private async Task RunGraphConsolidationPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: graph consolidation pass complete — {EntitiesDeleted} entities deleted, {TriplesDeleted} triples deleted", entitiesDeleted, triplesDeleted); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: graph consolidation pass failed"); - } + }); } /// @@ -1735,7 +1723,7 @@ private async Task RunMemoryMiningPassAsync(CancellationToken ct) _logger.LogInformation("DreamService: memory mining pass — {Count} log entries to analyze", entries.Count); - try + await RunPassAsync("memory mining", async () => { var userMessage = new StringBuilder(); userMessage.AppendLine("Review the following conversation log for facts worth storing in long-term memory:"); @@ -1787,11 +1775,7 @@ private async Task RunMemoryMiningPassAsync(CancellationToken ct) } _logger.LogInformation("DreamService: memory mining pass complete — {Saved} entry(ies) saved", saved); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: memory mining pass failed"); - } + }); } /// @@ -2115,7 +2099,7 @@ private async Task RunToolSuccessLearningPassAsync(CancellationToken ct) var userMessage = BuildToolSuccessLearningUserMessage(distinctPatterns); - try + await RunPassAsync("tool-success-learning", async () => { var result = await InvokeDreamPassAsync( "tool-success-learning", @@ -2136,11 +2120,7 @@ private async Task RunToolSuccessLearningPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: tool-success-learning pass complete — {Saved} entry(ies) saved", entries.Count); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: tool-success-learning pass failed"); - } + }); } /// @@ -2191,6 +2171,8 @@ private async Task RunPreferenceInferencePassAsync(CancellationToken ct) try { + await RunPassAsync("preference inference", async () => + { // Build user message: turns grouped by session var userMessage = new StringBuilder(); userMessage.AppendLine("Review the following conversation log for durable user preference patterns:"); @@ -2271,10 +2253,7 @@ private async Task RunPreferenceInferencePassAsync(CancellationToken ct) _logger.LogInformation("DreamService: preference inference pass complete — {Saved} preference(s) inferred", saved); } - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: preference inference pass failed"); + }); } finally { @@ -2425,7 +2404,7 @@ private async Task RunSequenceSkillDetectionPassAsync(CancellationToken ct) if (_toolCallLog is null || _skillStore is null || !_options.SequenceSkillDetectionEnabled) return; - try + await RunPassAsync("sequence skill detection", async () => { var events = await _toolCallLog.QueryRecentAsync( DateTimeOffset.UtcNow.AddDays(-14), maxResults: 10_000); @@ -2520,11 +2499,7 @@ private async Task RunSequenceSkillDetectionPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: sequence skill detection pass complete — {Created} skills created from {SessionCount} sessions", created, sessions.Count); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: sequence skill detection pass failed"); - } + }); } // ── Wisp failure analysis ───────────────────────────────────────────── @@ -2574,7 +2549,7 @@ private async Task RunWispFailureAnalysisPassAsync(CancellationToken ct) if (_wispExecutionLog is null || _skillStore is null || !_options.WispFailureAnalysisEnabled) return; - try + await RunPassAsync("wisp failure analysis", async () => { var records = await _wispExecutionLog.QueryRecentAsync( DateTimeOffset.UtcNow.AddDays(-14), maxResults: 500); @@ -2682,11 +2657,7 @@ private async Task RunWispFailureAnalysisPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: wisp failure analysis pass complete — {Patterns} patterns, {Updates} skill updates, {Candidates} promotion candidates", result?.Patterns?.Count ?? 0, updated, result?.PromotionCandidates?.Count ?? 0); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: wisp failure analysis pass failed"); - } + }); } private sealed record WispFailureAnalysisResultDto @@ -2727,7 +2698,7 @@ private async Task RunDlqReviewPassAsync(CancellationToken ct) { if (_dlqSampler is null || !_options.DlqReviewEnabled) return; - try + await RunPassAsync("DLQ review", async () => { var queues = await _dlqSampler.GetDlqQueuesAsync(); var nonEmpty = queues.Where(q => q.MessageCount > 0).ToList(); @@ -2845,11 +2816,7 @@ private async Task RunDlqReviewPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: DLQ review complete — {Patterns} pattern(s) saved, {Purged} queue(s) purged", savedPatterns, purged); - } - catch (Exception ex) - { - _logger.LogError(ex, "DreamService: DLQ review pass failed"); - } + }); } /// @@ -2864,7 +2831,7 @@ private async Task RunIdentityReflectionPassAsync(CancellationToken ct) _logger.LogInformation("DreamService: identity reflection pass — starting"); - try + await RunPassAsync("identity reflection", async () => { // Fetch current identity entries var identityEntries = await _memory.SearchAsync( @@ -2997,10 +2964,28 @@ private async Task RunIdentityReflectionPassAsync(CancellationToken ct) _logger.LogInformation( "DreamService: identity reflection pass complete — {Deleted} deleted, {Saved} saved", deleted, saved); + }); + } + + /// + /// Wraps a single dream-pass body so unhandled exceptions become a per-pass + /// error log without aborting the whole cycle. + /// is rethrown so DreamAsync's outer handler can log a single + /// "preempted by user request" line — see issue #333. + /// + private async Task RunPassAsync(string passName, Func body) + { + try + { + await body(); + } + catch (OperationCanceledException) + { + throw; } catch (Exception ex) { - _logger.LogError(ex, "DreamService: identity reflection pass failed"); + _logger.LogError(ex, "DreamService: {Pass} pass failed", passName); } }