Skip to content

Coalesce subagent result handling so consolidation gate works (v0.10.31)#351

Merged
rockfordlhotka merged 4 commits intomainfrom
fix/parallel-subagent-finals
May 7, 2026
Merged

Coalesce subagent result handling so consolidation gate works (v0.10.31)#351
rockfordlhotka merged 4 commits intomainfrom
fix/parallel-subagent-finals

Conversation

@rockfordlhotka
Copy link
Copy Markdown
Member

Summary

Today's 10 AM `morning-daily-operational-brief` patrol produced three "final" bubbles in the UI instead of one consolidated synthesis. Two compounding issues defeated the v0.10.24 gate fix:

  1. RabbitMQ consumer dispatch concurrency was 1. While the first `SubagentResultHandler` invocation sat in `gate.AccumulateAsync` waiting for siblings, every other subagent result message stayed queued. The wait loop saw siblings via `SubagentManager.ListActive()` (in-process state) and fired solo as soon as it emptied — even though the actual result messages were still in the broker.
  2. The 30 s "stale batch → replace" check in the gate was shorter than a typical Phase 2 synthesis (~44 s today). When queued sibling results finally got dequeued post-synthesis, each one started a fresh `PendingBatch` and produced its own duplicate Phase 2 instead of hitting `if (batch.Fired) return null`.

Result: 3 subagents → 3 separate batches → 3 final bubbles.

Changes

Messaging layer

  • `IMessageSubscriber.SubscribeAsync` gains optional `dispatchConcurrency` (placed after `cancellationToken` so existing positional callers compile unchanged).
  • `RabbitMqSubscriber` honors it via channel-level `ConsumerDispatchConcurrency` on both initial-create and 406-reconnect paths.
  • `RabbitMqPublisher.PublishAsync` wraps the shared-channel publish in a `SemaphoreSlim` — closes a latent thread-safety hole that becomes reachable once consumer concurrency is on.

Host wiring

  • `AgentHostOptions.Topics` is now `List`. `AgentHostBuilder.SubscribeTo` gains the new arg.
  • `SubagentServiceCollectionExtensions` subscribes the result topic at concurrency = `max(2, MaxConcurrentSubagents + 1)`.

Gate

  • 30 s staleness check replaced with `ChooseCeiling(primarySessionId) + 2 min` slack.
  • `CleanupBatch` retention extended to `ceiling + slack + 5 s`. Late arrivals reliably hit `if (batch.Fired) return null`.

User session demotion

  • `UserMessageHandler` mirrors the `spawnedSubagent` callback pattern from `ScheduledTaskHandler`. When a consolidating subagent was spawned during the loop, the parent's final reply is demoted to `IsFinal=false` and Phase 2 synthesis owns the user's final bubble. The synchronous no-tool path is unaffected.

Result

Path Bubbles
User session, no subagents 1 final from parent
User session + subagents 1 progress from parent + 1 consolidated final
Scheduled task, no subagents 1 final from parent
Scheduled task + subagents 1 consolidated final

Test plan

  • All 1547 unit tests pass; integration tests skipped (no local RabbitMQ).
  • New `AccumulateAsync_LateArrival_WithinCeilingPlusSlack_ReturnsNull` guards the exact regression seen this morning.
  • New `AccumulateAsync_ConcurrentArrivals_AccumulateIntoOneBatch` exercises the post-fix concurrent path.
  • 6 `StubSubscriber` test doubles updated for the new interface signature.
  • Deploy v0.10.31 to k8s and watch a multi-subagent patrol cycle (heartbeat-patrol or next morning brief) to confirm a single consolidated final bubble.
  • Verify a multi-subagent user-session turn produces one progress bubble + one final synthesis bubble.

🤖 Generated with Claude Code

rockfordlhotka and others added 4 commits May 7, 2026 12:18
The 10 AM `morning-daily-operational-brief` patrol surfaced three "final"
result bubbles instead of one consolidated synthesis. Investigation traced
this to two compounding issues that defeated the v0.10.24 gate fix:

1. RabbitMQ consumer dispatch concurrency was 1 (the default). While
   SubagentResultHandler's first invocation sat in
   `SubagentResultGate.AccumulateAsync` waiting for siblings, every other
   subagent result message stayed queued. The wait loop only saw siblings
   via `subagentManager.ListActive()` — which empties as runners finish —
   so the gate fired solo as soon as in-process state cleared, even though
   the actual result messages were still in the broker queue.

2. The 30 s "stale batch → replace" check in the gate was shorter than a
   typical Phase 2 synthesis (~44 s in this morning's case). When the
   queued sibling results were finally dequeued post-synthesis, each one
   created a fresh `PendingBatch` and produced its own duplicate Phase 2,
   instead of hitting `if (batch.Fired) return null`.

Fixes:

- Add per-subscription `dispatchConcurrency` to IMessageSubscriber and
  honor it in RabbitMqSubscriber via channel-level
  `ConsumerDispatchConcurrency`. AgentHostBuilder.SubscribeTo gains an
  optional `dispatchConcurrency` param; AgentHostOptions.Topics now holds
  TopicSubscription records. Sibling result handlers can now run in
  parallel and the gate's signal/wait pattern actually fires.
- Wrap `RabbitMqPublisher.PublishAsync` in a SemaphoreSlim. v7 IChannel
  is not thread-safe for publishing; this was a latent issue that becomes
  reachable once consumer concurrency is on.
- Subscribe the subagent-result topic at concurrency =
  `max(2, MaxConcurrentSubagents + 1)`.
- Replace the gate's hardcoded 30 s/35 s windows with values derived from
  the consolidation ceiling (background 600 s + 2 min slack, interactive
  300 s + 2 min slack), so late arrivals reliably defer to the winner.

Also align user-session behavior with the user's mental model:

- UserMessageHandler now demotes its parent reply to `IsFinal=false`
  when the loop spawned a consolidating subagent, mirroring the
  `spawnedSubagent` callback that ScheduledTaskHandler uses to suppress
  its own reply. The Phase 2 synthesis becomes the single user-facing
  final bubble. The synchronous no-tool path still emits IsFinal=true.

Tests:

- Add `AccumulateAsync_LateArrival_WithinCeilingPlusSlack_ReturnsNull`
  guarding the regression: a sibling arriving 1 s after fire (well within
  ceiling+slack) must short-circuit instead of starting a fresh batch.
- Add `AccumulateAsync_ConcurrentArrivals_AccumulateIntoOneBatch`
  exercising the post-fix concurrent path.
- Update 6 `StubSubscriber` test doubles for the new interface signature.
- Update AgentHostBuilderTests for the TopicSubscription shape and add
  `SubscribeTo_RecordsDispatchConcurrency`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The v0.10.31 demotion of the parent reply to IsFinal=false broke the
user-proxy correlation contract: UserProxyService.SendAsync waits on a
TaskCompletionSource keyed by the original correlationId for a final
reply (UserProxyService.cs:277). With the demotion, that TCS never
resolves — the synthesis publishes without the original correlationId,
so it arrives via the unsolicited-reply path and is rendered as a chat
bubble, but the user-side wait still times out at DefaultReplyTimeout.

Re-reading the goal: a user-session turn that spawns subagents should
produce TWO chat bubbles — one announcing the delegation, one with the
synthesized answer. Both are IsFinal=true. The actual fix for the
duplicate-bubbles issue is the gate consolidation (v0.10.31), which
ensures the synthesis fires exactly once instead of once per result.
That's already in place; the demotion was overcorrecting.

Behavior after this revert:

| Path                           | Bubbles                          |
| User session, no subagents     | 1 final from parent              |
| User session + subagents       | 1 final from parent + 1 from synthesis |
| Scheduled task, no subagents   | 1 final from parent              |
| Scheduled task + subagents     | 1 from synthesis (parent suppressed) |

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cement the architectural rule that final user-facing chat bubbles only
come from the primary agent. Subagents and A2A handlers may emit non-final
progress (collapsed activity-log entries) but must not produce final
result bubbles directly — those flow back through the primary agent.

UserMessageHandler:
- Re-introduce the spawnedConsolidatingSubagent flag (mirrors the pattern
  in ScheduledTaskHandler) and demote the parent reply to IsFinal=false
  when subagents will produce the actual answer. The collapsed activity
  log shows the user what's happening without rendering a "result" bubble.
- Before demoting, register the original user-message correlationId in
  IPendingTurnCorrelations so the synthesis can pick it up.

IPendingTurnCorrelations (new singleton in RockBot.Host):
- Tiny ConcurrentDictionary<sessionId, correlationId> with Set/TryTake.
- Lifecycle: UserMessageHandler.Set on demotion → SubagentResultHandler
  TryTake at Phase 2 publish.

SubagentResultHandler:
- Phase 2 synthesis publish now retrieves and uses the original user
  correlationId, so UserProxyService.SendAsync's pending TaskCompletionSource
  resolves cleanly. The previous demotion attempt (v0.10.31, reverted in
  v0.10.32) failed precisely because the synthesis lacked the correlationId
  and the user-side wait timed out.

A2ATaskResultHandler / A2ATaskErrorHandler:
- Detect non-user sessions (PrimarySessionId is "wisp-…" or "session/subagent-…")
  and skip the synthesis + IsFinal=true publish. The result is still written
  to working memory; the calling subagent or wisp pulls it from there and
  incorporates it into its own output. Only direct user-session A2A
  invocations continue to produce a primary-agent-authored final bubble.

End-state per turn type:

| Path                          | Bubbles                              |
| User session, no subagents    | 1 final from primary                 |
| User session + subagents      | 1 final from Phase 2 synthesis       |
| Scheduled task, with subagents| 1 final from Phase 2 synthesis       |
| Subagent invokes A2A          | 0 — result flows back through caller |

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… (v0.10.34)

The v0.10.33 demotion put the primary's "I'll delegate to subagents..."
announcement into the collapsed activity log instead of as a visible chat
bubble, which is the opposite of what was wanted: the primary IS talking
to the user when it explains it's spinning up subagents, so that message
should be a normal chat bubble.

The demotion also produced spurious "No reply received" timeouts. With
the parent reply demoted to IsFinal=false, UserProxyService.SendAsync's
TaskCompletionSource only resolved when Phase 2 synthesis arrived with
the threaded correlationId. When subagents took longer than
DefaultReplyTimeout (which is normal for a 3-subagent fan-out), the
SendAsync timed out and removed the pending entry from `_pending` (line
292 finally). The synthesis arrived later still with that correlationId
but with `_pending` empty took the unsolicited-reply path, so the user
saw a timeout error AND the consolidated summary.

Revert leaves us with the working two-bubble model:
- Parent reply IsFinal=true with the original correlationId — chat
  bubble for the announcement, resolves the user-proxy TCS immediately
  (no timeout while subagents run).
- Phase 2 synthesis IsFinal=true without correlationId — unsolicited
  final bubble for the consolidated answer.

Kept from prior PR commits:
- Gate fix (one synthesis per batch instead of N).
- A2A bubble suppression from non-user sessions: subagent/wisp A2A
  responses don't produce their own user-visible bubbles; the calling
  loop pulls from working memory and incorporates them.

IPendingTurnCorrelations is no longer needed — removed along with its
DI registration.

End state per turn type:

| Path                          | Bubbles                            |
| User session, no subagents    | 1 final from primary               |
| User session + subagents      | 1 announce + 1 consolidated answer |
| Scheduled task + subagents    | 1 from synthesis (parent suppressed) |
| Subagent invokes A2A          | 0 — flows back through caller      |

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rockfordlhotka rockfordlhotka merged commit c8460f4 into main May 7, 2026
2 checks passed
@rockfordlhotka rockfordlhotka deleted the fix/parallel-subagent-finals branch May 7, 2026 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant