diff --git a/Directory.Build.props b/Directory.Build.props
index 408368e0..26488294 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -9,7 +9,7 @@
-0.10.30
+0.10.34
diff --git a/src/RockBot.A2A/A2ATaskErrorHandler.cs b/src/RockBot.A2A/A2ATaskErrorHandler.cs
index d1cff523..f3e1c9cb 100644
--- a/src/RockBot.A2A/A2ATaskErrorHandler.cs
+++ b/src/RockBot.A2A/A2ATaskErrorHandler.cs
@@ -33,6 +33,15 @@ internal sealed class A2ATaskErrorHandler(
{
private string DisplayName => agentNameHolder.DisplayName ?? agent.Name;
+ ///
+ /// True only when an A2A task originated from the primary agent's user session.
+ /// Subagent and wisp sessions are not user-facing; their A2A errors flow back to
+ /// the calling loop rather than producing user bubbles.
+ ///
+ private static bool IsUserSession(string primarySessionId) =>
+ primarySessionId.StartsWith("session/", StringComparison.OrdinalIgnoreCase) &&
+ !primarySessionId.StartsWith("session/subagent-", StringComparison.OrdinalIgnoreCase);
+
public async Task HandleAsync(AgentTaskError error, MessageHandlerContext context)
{
var ct = context.CancellationToken;
@@ -58,6 +67,19 @@ public async Task HandleAsync(AgentTaskError error, MessageHandlerContext contex
"A2A task error for task {TaskId} from agent '{TargetAgent}' in session {SessionId}: [{Code}] {Message}",
error.TaskId, pending.TargetAgent, pending.PrimarySessionId, error.Code, error.Message);
+ // Only the primary agent talks to the user. When the failed A2A invocation came
+ // from a subagent or wisp, the calling loop will surface the failure in its own
+ // output — emitting a separate user-facing bubble here would bypass the primary
+ // agent and show transport-layer noise to the user.
+ if (!IsUserSession(pending.PrimarySessionId))
+ {
+ logger.LogInformation(
+ "A2A task error for {TaskId} originated from non-user session {SessionId} — " +
+ "skipping synthesis and bubble publish (caller will surface the error)",
+ error.TaskId, pending.PrimarySessionId);
+ return;
+ }
+
// PrimarySessionId is the full WM session namespace (e.g. "session/blazor-session").
// Strip the prefix for conversation memory and context builder consistency.
var sessionNamespace = pending.PrimarySessionId;
diff --git a/src/RockBot.A2A/A2ATaskResultHandler.cs b/src/RockBot.A2A/A2ATaskResultHandler.cs
index 58c90e4a..aef9acee 100644
--- a/src/RockBot.A2A/A2ATaskResultHandler.cs
+++ b/src/RockBot.A2A/A2ATaskResultHandler.cs
@@ -43,6 +43,18 @@ internal sealed class A2ATaskResultHandler(
{
private string DisplayName => agentNameHolder.DisplayName ?? agent.Name;
+ ///
+ /// True only when an A2A task originated from the primary agent's user session
+ /// (e.g. "session/blazor-session"). Subagent sessions ("session/subagent-...") and
+ /// transient executor sessions ("wisp-...") are not user-facing — A2A results
+ /// directed at them must flow back to the calling loop via working memory rather
+ /// than producing their own user-visible chat bubble. Only the primary agent
+ /// communicates with the user.
+ ///
+ private static bool IsUserSession(string primarySessionId) =>
+ primarySessionId.StartsWith("session/", StringComparison.OrdinalIgnoreCase) &&
+ !primarySessionId.StartsWith("session/subagent-", StringComparison.OrdinalIgnoreCase);
+
public async Task HandleAsync(AgentTaskResult result, MessageHandlerContext context)
{
var ct = context.CancellationToken;
@@ -234,6 +246,20 @@ await workingMemory.SetAsync(
"A2A result for task {TaskId} ({Len:N0} chars) stored in working memory at key '{Key}'",
result.TaskId, resultText.Length, memoryKey);
+ // If the A2A invocation didn't originate in a user session, the result must flow
+ // back to the calling loop (subagent or wisp) via working memory — not as a
+ // user-visible chat bubble. The caller will pull the result and incorporate it
+ // into its own output, which is the only thing the user should see. Skip the
+ // synthesis + publish entirely for those callers.
+ if (!IsUserSession(pending.PrimarySessionId))
+ {
+ logger.LogInformation(
+ "A2A task {TaskId} originated from non-user session {SessionId} — skipping " +
+ "synthesis and bubble publish (caller will consume the working-memory result)",
+ result.TaskId, pending.PrimarySessionId);
+ return;
+ }
+
syntheticUserTurn =
$"[Agent '{pending.TargetAgent}' completed task {result.TaskId} (state={result.State})]: " +
$"The result ({resultText.Length:N0} chars) is in working memory. " +
@@ -306,6 +332,19 @@ await conversationMemory.AddTurnAsync(
private async Task PublishErrorToUserAsync(
PendingA2ATask pending, string taskId, string errorMessage, CancellationToken ct)
{
+ // Only the primary agent talks to the user. When the failed A2A invocation came
+ // from a subagent or wisp, surface the error through that caller's normal output
+ // path (it sees the failure via working memory / its loop's exception handling)
+ // rather than emitting our own bubble.
+ if (!IsUserSession(pending.PrimarySessionId))
+ {
+ logger.LogInformation(
+ "Suppressing A2A error bubble for task {TaskId} — invocation came from non-user " +
+ "session {SessionId}; caller will surface the error in its own output",
+ taskId, pending.PrimarySessionId);
+ return;
+ }
+
var sessionNamespace = pending.PrimarySessionId;
const string SessionPrefix = "session/";
var rawSessionId = sessionNamespace.StartsWith(SessionPrefix, StringComparison.OrdinalIgnoreCase)
diff --git a/src/RockBot.Agent/UserMessageHandler.cs b/src/RockBot.Agent/UserMessageHandler.cs
index a60fc06d..38dc7b85 100644
--- a/src/RockBot.Agent/UserMessageHandler.cs
+++ b/src/RockBot.Agent/UserMessageHandler.cs
@@ -452,6 +452,10 @@ await conversationMemory.AddTurnAsync(
{ AgentName = agent.Name },
ct);
+ // The parent reply is the user's "I'm starting work" / direct answer bubble
+ // and resolves the user-proxy SendAsync TCS. If the loop spawned consolidating
+ // subagents, their Phase 2 synthesis will arrive later as a separate
+ // unsolicited final bubble — that's the consolidated answer.
await PublishReplyAsync(text, replyTo, correlationId, sessionId, isFinal: true, ct);
loopSw.Stop();
turnActivity?.SetTag("rockbot.turn.status", "ok");
diff --git a/src/RockBot.Host/AgentHost.cs b/src/RockBot.Host/AgentHost.cs
index 2d488103..004796f4 100644
--- a/src/RockBot.Host/AgentHost.cs
+++ b/src/RockBot.Host/AgentHost.cs
@@ -45,19 +45,22 @@ public async Task StartAsync(CancellationToken cancellationToken)
await RecoverWipEntriesAsync(cancellationToken);
- foreach (var topic in _options.Topics)
+ foreach (var sub in _options.Topics)
{
- var sanitizedTopic = topic.Replace(".", "-").Replace("*", "_").Replace("#", "__");
+ var sanitizedTopic = sub.Topic.Replace(".", "-").Replace("*", "_").Replace("#", "__");
var subscriptionName = $"{_identity.Name}.{sanitizedTopic}";
var subscription = await _subscriber.SubscribeAsync(
- topic,
+ sub.Topic,
subscriptionName,
(envelope, ct) => _pipeline.DispatchAsync(envelope, ct),
- cancellationToken);
+ cancellationToken,
+ sub.DispatchConcurrency);
_subscriptions.Add(subscription);
- _logger.LogInformation("Subscribed to {Topic} as {SubscriptionName}", topic, subscriptionName);
+ _logger.LogInformation(
+ "Subscribed to {Topic} as {SubscriptionName} (dispatchConcurrency={Concurrency})",
+ sub.Topic, subscriptionName, sub.DispatchConcurrency);
}
}
diff --git a/src/RockBot.Host/AgentHostBuilder.cs b/src/RockBot.Host/AgentHostBuilder.cs
index 918e5163..b1e00480 100644
--- a/src/RockBot.Host/AgentHostBuilder.cs
+++ b/src/RockBot.Host/AgentHostBuilder.cs
@@ -40,11 +40,14 @@ public AgentHostBuilder WithIdentity(string name, string? instanceId = null)
}
///
- /// Subscribe to a topic.
+ /// Subscribe to a topic. Optionally specify
+ /// to allow concurrent handler invocations for this subscription. Default 1
+ /// (sequential dispatch, preserves message ordering). Only bump when the handler
+ /// is re-entrant and may block on cross-message coordination.
///
- public AgentHostBuilder SubscribeTo(string topic)
+ public AgentHostBuilder SubscribeTo(string topic, int dispatchConcurrency = 1)
{
- _options.Topics.Add(topic);
+ _options.Topics.Add(new TopicSubscription(topic, dispatchConcurrency));
return this;
}
@@ -77,8 +80,8 @@ internal void Build()
_services.AddSingleton();
_services.Configure(opts =>
{
- foreach (var topic in _options.Topics)
- opts.Topics.Add(topic);
+ foreach (var sub in _options.Topics)
+ opts.Topics.Add(sub);
});
}
}
diff --git a/src/RockBot.Host/AgentHostOptions.cs b/src/RockBot.Host/AgentHostOptions.cs
index 4d4e011b..91bdebfc 100644
--- a/src/RockBot.Host/AgentHostOptions.cs
+++ b/src/RockBot.Host/AgentHostOptions.cs
@@ -1,14 +1,24 @@
namespace RockBot.Host;
+///
+/// A topic subscription paired with its dispatch concurrency.
+///
+/// The topic pattern (with wildcards).
+/// Maximum concurrent in-flight handler invocations
+/// for this subscription. Default 1 (sequential, preserves ordering). Bump only for
+/// re-entrant handlers where cross-message coordination would otherwise deadlock the
+/// consumer (e.g. the subagent-result consolidation gate).
+public sealed record TopicSubscription(string Topic, int DispatchConcurrency = 1);
+
///
/// Configuration options for the agent host.
///
public sealed class AgentHostOptions
{
///
- /// Topics the agent subscribes to.
+ /// Topics the agent subscribes to, paired with their dispatch concurrency.
///
- public List Topics { get; } = [];
+ public List Topics { get; } = [];
///
/// Default maximum number of tool-calling round-trips per request.
diff --git a/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs b/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs
index 299fcbdb..818f6519 100644
--- a/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs
+++ b/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs
@@ -16,10 +16,19 @@ public interface IMessageSubscriber : IAsyncDisposable
/// used to create a durable consumer group/queue.
/// Async handler invoked for each message.
/// Cancellation token.
+ /// Maximum number of messages from this
+ /// subscription that may be processed concurrently. Default 1 (sequential
+ /// processing — preserves message ordering). Bump higher when the handler
+ /// is re-entrant and may block on cross-message coordination (e.g. a
+ /// consolidation gate that waits for siblings). Providers that don't
+ /// support per-subscription concurrency may ignore this hint. Placed after
+ /// so positional callers that pass
+ /// only a CT keep working unchanged.
/// A subscription handle that can be disposed to unsubscribe.
Task SubscribeAsync(
string topic,
string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default);
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1);
}
diff --git a/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs b/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs
index d300e36d..93f148ff 100644
--- a/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs
+++ b/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs
@@ -18,8 +18,12 @@ public Task SubscribeAsync(
string topic,
string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
+ // dispatchConcurrency is a transport hint; the in-process bus already invokes
+ // each subscription handler asynchronously per publish call, so there's no
+ // separate dispatch loop to widen.
var subscription = new InProcessSubscription(topic, subscriptionName, handler, _bus, _logger);
_bus.Register(subscription);
return Task.FromResult(subscription);
diff --git a/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs b/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs
index 521a19a5..0f80d72d 100644
--- a/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs
+++ b/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs
@@ -29,10 +29,30 @@ public RabbitMqConnectionManager(
/// Each caller gets its own channel (publishers and subscribers should
/// not share channels in RabbitMQ).
///
- public async Task CreateChannelAsync(CancellationToken cancellationToken = default)
+ /// When non-null and greater than 1,
+ /// configures the channel to dispatch consumer events concurrently. Used by
+ /// subscriber channels whose handlers must run in parallel (e.g. subagent
+ /// result accumulation). Null falls back to the connection-level default.
+ public async Task CreateChannelAsync(
+ CancellationToken cancellationToken = default,
+ ushort? consumerDispatchConcurrency = null)
{
var connection = await GetConnectionAsync(cancellationToken);
- var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+
+ IChannel channel;
+ if (consumerDispatchConcurrency is { } concurrency && concurrency > 1)
+ {
+ var options = new CreateChannelOptions(
+ publisherConfirmationsEnabled: false,
+ publisherConfirmationTrackingEnabled: false,
+ outstandingPublisherConfirmationsRateLimiter: null,
+ consumerDispatchConcurrency: concurrency);
+ channel = await connection.CreateChannelAsync(options, cancellationToken);
+ }
+ else
+ {
+ channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ }
// Declare the topic exchange
await channel.ExchangeDeclareAsync(
diff --git a/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs b/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs
index 94e7069e..41857820 100644
--- a/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs
+++ b/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs
@@ -17,6 +17,13 @@ public sealed class RabbitMqPublisher : IMessagePublisher
private readonly ILogger _logger;
private IChannel? _channel;
private readonly SemaphoreSlim _channelLock = new(1, 1);
+
+ // RabbitMQ.Client v7 IChannel is NOT thread-safe for publishing — concurrent
+ // BasicPublishAsync calls on the same channel can interleave AMQP frames and
+ // corrupt the wire protocol. Serialize publishes through this semaphore so the
+ // singleton publisher remains safe even when consumers (e.g. subagent results
+ // with raised dispatch concurrency) trigger parallel publish paths.
+ private readonly SemaphoreSlim _publishLock = new(1, 1);
private bool _disposed;
public RabbitMqPublisher(
@@ -79,13 +86,21 @@ public async Task PublishAsync(
"Publishing message {MessageId} to topic {Topic} (type: {Type})",
envelope.MessageId, topic, envelope.MessageType);
- await channel.BasicPublishAsync(
- exchange: _options.ExchangeName,
- routingKey: topic,
- mandatory: false,
- basicProperties: properties,
- body: envelope.Body,
- cancellationToken: cancellationToken);
+ await _publishLock.WaitAsync(cancellationToken);
+ try
+ {
+ await channel.BasicPublishAsync(
+ exchange: _options.ExchangeName,
+ routingKey: topic,
+ mandatory: false,
+ basicProperties: properties,
+ body: envelope.Body,
+ cancellationToken: cancellationToken);
+ }
+ finally
+ {
+ _publishLock.Release();
+ }
sw.Stop();
RabbitMqDiagnostics.PublishDuration.Record(sw.Elapsed.TotalMilliseconds,
@@ -132,5 +147,6 @@ public async ValueTask DisposeAsync()
await _channel.CloseAsync();
_channelLock.Dispose();
+ _publishLock.Dispose();
}
}
diff --git a/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs b/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs
index cd25ccab..62d7f08e 100644
--- a/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs
+++ b/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs
@@ -33,7 +33,8 @@ public async Task SubscribeAsync(
string topic,
string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
var queueName = $"rockbot.{subscriptionName}";
var dlqName = $"{queueName}.dlq";
@@ -41,13 +42,17 @@ public async Task SubscribeAsync(
var dlxName = _options.DeadLetterExchangeName;
var prefetchCount = _options.PrefetchCount;
var durable = _options.Durable;
+ // Translate the abstraction's dispatchConcurrency hint into a channel-level
+ // ConsumerDispatchConcurrency. Values <=1 leave it unset so we keep the
+ // connection-level default and the channel processes deliveries sequentially.
+ var channelConcurrency = dispatchConcurrency > 1 ? (ushort?)dispatchConcurrency : null;
// Factory that creates a fresh channel + consumer, called both for initial
// setup and for transparent reconnection after unexpected channel closure.
async Task<(IChannel channel, string consumerTag)> CreateChannelAndConsumerAsync(
CancellationToken ct)
{
- var channel = await _connectionManager.CreateChannelAsync(ct);
+ var channel = await _connectionManager.CreateChannelAsync(ct, channelConcurrency);
await channel.BasicQosAsync(
prefetchSize: 0,
@@ -97,8 +102,9 @@ await channel.QueueDeclareAsync(
"Queue {Queue} has stale arguments — deleting and recreating: {Reason}",
queueName, ex.ShutdownReason.ReplyText);
- // The original channel is dead after a 406; open a new one.
- channel = await _connectionManager.CreateChannelAsync(ct);
+ // The original channel is dead after a 406; open a new one with the
+ // same concurrency setting as the original.
+ channel = await _connectionManager.CreateChannelAsync(ct, channelConcurrency);
await channel.BasicQosAsync(0, prefetchCount, false, cancellationToken: ct);
await channel.QueueDeleteAsync(queueName, cancellationToken: ct);
diff --git a/src/RockBot.Subagent/SubagentResultGate.cs b/src/RockBot.Subagent/SubagentResultGate.cs
index b0e95b5b..f940d8f7 100644
--- a/src/RockBot.Subagent/SubagentResultGate.cs
+++ b/src/RockBot.Subagent/SubagentResultGate.cs
@@ -24,6 +24,14 @@ internal sealed class SubagentResultGate(
// result before returning, but message dispatch through RabbitMQ is async.
private static readonly TimeSpan CancellationGrace = TimeSpan.FromSeconds(5);
+ // Extra slack added on top of the consolidation ceiling for the late-arrival window
+ // and post-fire batch retention. Phase 2 synthesis can run for tens of seconds, and
+ // a stragger result message may sit in the broker through that whole time before
+ // being delivered to a fresh handler invocation. We must keep the fired batch around
+ // long enough for that late arrival to find it and short-circuit (return null)
+ // instead of starting a brand-new batch that fires its own duplicate synthesis.
+ private static readonly TimeSpan PostFireRetentionSlack = TimeSpan.FromMinutes(2);
+
///
/// Accumulates a subagent result into its batch. Returns:
///
@@ -44,9 +52,16 @@ internal sealed class SubagentResultGate(
var batch = _pending.GetOrAdd(batchKey, _ => new PendingBatch());
- // If a stale batch (already fired > 30s ago), replace it
+ // If a fired batch is "stale enough" to be safely replaced — i.e. older than
+ // (consolidation ceiling + slack) — start a fresh one. Within that window we
+ // assume any incoming result with this batchId is a delayed sibling of the
+ // already-fired batch, not a brand-new batch with a coincidentally reused key.
+ // BatchIds are 12-char GUID prefixes per primary session, so true reuse is
+ // astronomically unlikely; the staleness check only protects against pathological
+ // edge cases (e.g. clock skew on resume after suspend).
+ var staleThreshold = ChooseCeiling(result.PrimarySessionId) + PostFireRetentionSlack;
if (batch.Fired && batch.FiredAt is { } firedAt
- && DateTimeOffset.UtcNow - firedAt > TimeSpan.FromSeconds(30))
+ && DateTimeOffset.UtcNow - firedAt > staleThreshold)
{
var fresh = new PendingBatch();
if (_pending.TryUpdate(batchKey, fresh, batch))
@@ -95,7 +110,7 @@ internal sealed class SubagentResultGate(
{
logger.LogInformation(
"Batch {BatchKey} fired with {Count} result(s)", batchKey, fired.Count);
- CleanupBatch(batchKey);
+ CleanupBatch(batchKey, result.PrimarySessionId);
return fired;
}
return null; // someone else won the race
@@ -188,7 +203,7 @@ internal sealed class SubagentResultGate(
logger.LogInformation(
"Batch {BatchKey} fired at ceiling with {Count} result(s) ({Cancelled} cancelled)",
batchKey, ceilingFired.Count, stragglers.Count);
- CleanupBatch(batchKey);
+ CleanupBatch(batchKey, result.PrimarySessionId);
return ceilingFired;
}
@@ -216,10 +231,15 @@ private TimeSpan ChooseCeiling(string primarySessionId)
}
}
- private void CleanupBatch(string batchKey)
+ private void CleanupBatch(string batchKey, string primarySessionId)
{
- // Don't remove immediately — keep for late-arrival detection (30s staleness window)
- _ = Task.Delay(TimeSpan.FromSeconds(35)).ContinueWith(_ => _pending.TryRemove(batchKey, out PendingBatch? _));
+ // Keep the fired batch around for at least (consolidation ceiling + slack) so
+ // late-arriving sibling results find it and return null (deferred-to-winner)
+ // instead of provoking a fresh PendingBatch with a duplicate synthesis. Must
+ // outlive both the staleness window in AccumulateAsync and any plausible
+ // Phase 2 synthesis time.
+ var retention = ChooseCeiling(primarySessionId) + PostFireRetentionSlack + TimeSpan.FromSeconds(5);
+ _ = Task.Delay(retention).ContinueWith(_ => _pending.TryRemove(batchKey, out PendingBatch? _));
}
private sealed class PendingBatch
diff --git a/src/RockBot.Subagent/SubagentResultHandler.cs b/src/RockBot.Subagent/SubagentResultHandler.cs
index 5df9e2f9..fe7d537f 100644
--- a/src/RockBot.Subagent/SubagentResultHandler.cs
+++ b/src/RockBot.Subagent/SubagentResultHandler.cs
@@ -208,6 +208,12 @@ await conversationMemory.AddTurnAsync(
{ AgentName = agent.Name },
ct);
+ // The synthesis is the consolidated answer bubble. The parent loop already
+ // emitted its own final reply (with the original user correlationId) before
+ // spawning subagents, so this synthesis arrives as an unsolicited final
+ // reply and renders as a separate chat bubble in the UI — the user sees
+ // the parent's "I'll delegate…" announcement followed by this consolidated
+ // answer once all subagents complete.
var reply = new AgentReply
{
Content = finalContent,
diff --git a/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs b/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs
index 146ed55b..15fc4a5a 100644
--- a/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs
+++ b/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs
@@ -16,6 +16,12 @@ public static AgentHostBuilder AddSubagents(
this AgentHostBuilder builder,
Action? configure = null)
{
+ // Build a snapshot of the options now so we can read MaxConcurrentSubagents
+ // when sizing the result-topic dispatch concurrency below. The configured
+ // delegate still runs at DI build time for the runtime-resolved options.
+ var optionsSnapshot = new SubagentOptions();
+ configure?.Invoke(optionsSnapshot);
+
if (configure is not null)
builder.Services.Configure(configure);
else
@@ -31,7 +37,16 @@ public static AgentHostBuilder AddSubagents(
builder.HandleMessage();
var agentName = builder.Identity.Name;
builder.SubscribeTo($"{SubagentTopics.Progress}.{agentName}");
- builder.SubscribeTo($"{SubagentTopics.Result}.{agentName}");
+
+ // Result topic must allow concurrent dispatch so sibling SubagentResultHandler
+ // invocations can each enter SubagentResultGate.AccumulateAsync simultaneously.
+ // Without this, the first handler's wait-for-siblings loop blocks the channel
+ // and queued sibling results are delivered serially after each Phase 2 finishes,
+ // producing one solo synthesis (and one final UI bubble) per subagent instead
+ // of one consolidated synthesis per batch.
+ // Size: MaxConcurrentSubagents (one slot per potential sibling) + 1 buffer.
+ var resultDispatchConcurrency = Math.Max(2, optionsSnapshot.MaxConcurrentSubagents + 1);
+ builder.SubscribeTo($"{SubagentTopics.Result}.{agentName}", resultDispatchConcurrency);
// Tool registrar (registers spawn_subagent, cancel_subagent, list_subagents)
builder.Services.AddHostedService();
diff --git a/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs b/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs
index 839495e5..020919b7 100644
--- a/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs
+++ b/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs
@@ -78,8 +78,8 @@ public void AddA2A_RegistersTopicSubscriptions()
var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService>();
- Assert.IsTrue(options.Value.Topics.Contains("agent.task.my-agent"));
- Assert.IsTrue(options.Value.Topics.Contains("agent.task.cancel.my-agent"));
+ Assert.IsTrue(options.Value.Topics.Any(t => t.Topic == "agent.task.my-agent"));
+ Assert.IsTrue(options.Value.Topics.Any(t => t.Topic == "agent.task.cancel.my-agent"));
}
[TestMethod]
@@ -109,7 +109,8 @@ private sealed class StubSubscriber : IMessageSubscriber
public Task SubscribeAsync(
string topic, string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
return Task.FromResult(new StubSubscription());
}
diff --git a/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs b/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs
index da30b4e5..fa95a5a0 100644
--- a/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs
+++ b/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs
@@ -242,7 +242,8 @@ private sealed class StubSubscriber : IMessageSubscriber
public Task SubscribeAsync(
string topic, string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
return Task.FromResult(new StubSubscription());
}
diff --git a/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs b/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs
index a1c11084..f1199007 100644
--- a/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs
+++ b/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs
@@ -51,8 +51,27 @@ public void SubscribeTo_AddsTopics()
var options = provider.GetRequiredService>().Value;
Assert.AreEqual(2, options.Topics.Count);
- CollectionAssert.Contains(options.Topics, "agent.task.*");
- CollectionAssert.Contains(options.Topics, "llm.response");
+ Assert.IsTrue(options.Topics.Any(t => t.Topic == "agent.task.*"));
+ Assert.IsTrue(options.Topics.Any(t => t.Topic == "llm.response"));
+ Assert.IsTrue(options.Topics.All(t => t.DispatchConcurrency == 1),
+ "Topics added without explicit concurrency should default to 1.");
+ }
+
+ [TestMethod]
+ public void SubscribeTo_RecordsDispatchConcurrency()
+ {
+ var services = new ServiceCollection();
+ services.AddLogging();
+ services.AddRockBotHost(agent => agent
+ .WithIdentity("my-agent")
+ .SubscribeTo("subagent.result.*", dispatchConcurrency: 4));
+
+ var provider = services.BuildServiceProvider();
+ var options = provider.GetRequiredService>().Value;
+
+ Assert.AreEqual(1, options.Topics.Count);
+ Assert.AreEqual("subagent.result.*", options.Topics[0].Topic);
+ Assert.AreEqual(4, options.Topics[0].DispatchConcurrency);
}
[TestMethod]
diff --git a/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs b/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs
index aa83a47c..4839b2f3 100644
--- a/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs
+++ b/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs
@@ -92,7 +92,8 @@ private sealed class StubSubscriber : IMessageSubscriber
{
public Task SubscribeAsync(string topic, string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
=> Task.FromResult(new StubSubscription(topic, subscriptionName));
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
diff --git a/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs b/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs
index d232d91b..775ec6ea 100644
--- a/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs
+++ b/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs
@@ -74,7 +74,8 @@ private sealed class StubSubscriber : IMessageSubscriber
{
public Task SubscribeAsync(string topic, string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
=> Task.FromResult(new StubSubscription(topic, subscriptionName));
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
diff --git a/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs b/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs
index f7f9d20a..1b2170e6 100644
--- a/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs
+++ b/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs
@@ -227,6 +227,72 @@ public async Task AccumulateAsync_LateArrival_AfterFired_ReturnsNull()
Assert.IsNull(batch2, "Late arrival to a fired batch must not trigger another synthesis");
}
+ // ── Late arrival inside ceiling+slack must return null even if Phase 2 was slow ─
+
+ [TestMethod]
+ public async Task AccumulateAsync_LateArrival_WithinCeilingPlusSlack_ReturnsNull()
+ {
+ // Background ceiling = 3 s. With +2 min slack the gate keeps a fired batch
+ // discoverable for ~2 min 3 s. A sibling arriving 1 s after fire (well within
+ // that window) must short-circuit to null, matching the production scenario
+ // where Phase 2 synthesis takes longer than the legacy 30 s staleness check.
+ var gate = CreateGate(backgroundCeilingSec: 3);
+ var result1 = MakeResult("task-1", primarySessionId: "patrol/morning-brief");
+
+ var batch1 = await gate.AccumulateAsync(result1, new FakeSubagentManager([]), CancellationToken.None);
+ Assert.IsNotNull(batch1);
+
+ // Simulate the time a Phase 2 synthesis would consume before the next
+ // sibling result is dequeued from the broker.
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ var result2 = MakeResult("task-2", primarySessionId: "patrol/morning-brief");
+ var batch2 = await gate.AccumulateAsync(result2, new FakeSubagentManager([]), CancellationToken.None);
+
+ Assert.IsNull(batch2,
+ "Late arrival within ceiling+slack must defer to the already-fired winner, " +
+ "not start a fresh batch that produces a duplicate Phase 2 synthesis.");
+ }
+
+ // ── Concurrent arrivals accumulate into a single batch ────────────────────
+
+ [TestMethod]
+ public async Task AccumulateAsync_ConcurrentArrivals_AccumulateIntoOneBatch()
+ {
+ // Models the post-fix runtime: with raised consumer dispatch concurrency, all
+ // sibling results enter AccumulateAsync simultaneously (instead of arriving
+ // serially after each Phase 2). They must coalesce into one batch with one
+ // winner, not one batch per result.
+ var gate = CreateGate(interactiveCeilingSec: 30);
+ var primarySessionId = "session/concurrent-1";
+
+ var entries = Enumerable.Range(1, 3)
+ .Select(i => MakeActiveEntry($"task-{i}", TimeSpan.FromSeconds(20), primarySessionId))
+ .ToList();
+
+ // Start all three handlers in parallel. Each sees the others as active siblings
+ // initially; the manager mutates as each task "completes" (its entry is removed).
+ var manager = new FakeMutableSubagentManager(entries);
+ var results = new[] { "task-1", "task-2", "task-3" }
+ .Select(id => MakeResult(id, primarySessionId)).ToArray();
+
+ var tasks = results.Select(r => Task.Run(async () =>
+ {
+ // Stagger entries' removal so the wait loop has something to detect.
+ await Task.Delay(50);
+ manager.MarkComplete(r.TaskId);
+ return await gate.AccumulateAsync(r, manager, CancellationToken.None);
+ })).ToArray();
+
+ var outcomes = await Task.WhenAll(tasks);
+
+ var winners = outcomes.Where(o => o is not null).ToList();
+ Assert.AreEqual(1, winners.Count, "Exactly one handler should be the synthesis winner.");
+ Assert.AreEqual(3, winners[0]!.Count, "Winner's batch must contain all three sibling results.");
+ Assert.IsTrue(winners[0]!.Select(r => r.TaskId).OrderBy(x => x)
+ .SequenceEqual(new[] { "task-1", "task-2", "task-3" }));
+ }
+
// ── Duplicate result (already fired) returns null ───────────────────────
[TestMethod]
@@ -286,4 +352,34 @@ public Task CancelAsync(string taskId)
public IReadOnlyList ListActive() => _active.ToList();
}
+
+ private sealed class FakeMutableSubagentManager(IReadOnlyList activeEntries) : ISubagentManager
+ {
+ private readonly object _lock = new();
+ private readonly List _active = activeEntries.ToList();
+
+ public Task SpawnAsync(string description, string? context, int? timeoutMinutes,
+ string primarySessionId, CancellationToken ct,
+ string? batchId = null, bool consolidate = true, int? maxIterations = null) =>
+ Task.FromResult("fake-task-id");
+
+ public Task CancelAsync(string taskId)
+ {
+ lock (_lock)
+ {
+ var removed = _active.RemoveAll(e => e.TaskId == taskId) > 0;
+ return Task.FromResult(removed);
+ }
+ }
+
+ public IReadOnlyList ListActive()
+ {
+ lock (_lock) return _active.ToList();
+ }
+
+ public void MarkComplete(string taskId)
+ {
+ lock (_lock) _active.RemoveAll(e => e.TaskId == taskId);
+ }
+ }
}
diff --git a/tests/RockBot.Tools.Tests/TestHelpers.cs b/tests/RockBot.Tools.Tests/TestHelpers.cs
index 2c99b780..c5c3b208 100644
--- a/tests/RockBot.Tools.Tests/TestHelpers.cs
+++ b/tests/RockBot.Tools.Tests/TestHelpers.cs
@@ -58,7 +58,8 @@ public Task SubscribeAsync(
string topic,
string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
Subscriptions.Add((topic, subscriptionName, handler));
ISubscription sub = new StubSubscription(topic, subscriptionName);
diff --git a/tests/RockBot.UserProxy.Tests/TestHelpers.cs b/tests/RockBot.UserProxy.Tests/TestHelpers.cs
index 6b0444cf..15475eae 100644
--- a/tests/RockBot.UserProxy.Tests/TestHelpers.cs
+++ b/tests/RockBot.UserProxy.Tests/TestHelpers.cs
@@ -44,7 +44,8 @@ public Task SubscribeAsync(
string topic,
string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
_subscriptions.Add((topic, subscriptionName, handler));
return Task.FromResult(new StubSubscription(topic, subscriptionName));
@@ -111,7 +112,8 @@ public Task SubscribeAsync(
string topic,
string subscriptionName,
Func> handler,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default,
+ int dispatchConcurrency = 1)
{
var call = Interlocked.Increment(ref _callCount);