diff --git a/Directory.Build.props b/Directory.Build.props index 408368e0..26488294 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -9,7 +9,7 @@ -0.10.30 +0.10.34 diff --git a/src/RockBot.A2A/A2ATaskErrorHandler.cs b/src/RockBot.A2A/A2ATaskErrorHandler.cs index d1cff523..f3e1c9cb 100644 --- a/src/RockBot.A2A/A2ATaskErrorHandler.cs +++ b/src/RockBot.A2A/A2ATaskErrorHandler.cs @@ -33,6 +33,15 @@ internal sealed class A2ATaskErrorHandler( { private string DisplayName => agentNameHolder.DisplayName ?? agent.Name; + /// + /// True only when an A2A task originated from the primary agent's user session. + /// Subagent and wisp sessions are not user-facing; their A2A errors flow back to + /// the calling loop rather than producing user bubbles. + /// + private static bool IsUserSession(string primarySessionId) => + primarySessionId.StartsWith("session/", StringComparison.OrdinalIgnoreCase) && + !primarySessionId.StartsWith("session/subagent-", StringComparison.OrdinalIgnoreCase); + public async Task HandleAsync(AgentTaskError error, MessageHandlerContext context) { var ct = context.CancellationToken; @@ -58,6 +67,19 @@ public async Task HandleAsync(AgentTaskError error, MessageHandlerContext contex "A2A task error for task {TaskId} from agent '{TargetAgent}' in session {SessionId}: [{Code}] {Message}", error.TaskId, pending.TargetAgent, pending.PrimarySessionId, error.Code, error.Message); + // Only the primary agent talks to the user. When the failed A2A invocation came + // from a subagent or wisp, the calling loop will surface the failure in its own + // output — emitting a separate user-facing bubble here would bypass the primary + // agent and show transport-layer noise to the user. + if (!IsUserSession(pending.PrimarySessionId)) + { + logger.LogInformation( + "A2A task error for {TaskId} originated from non-user session {SessionId} — " + + "skipping synthesis and bubble publish (caller will surface the error)", + error.TaskId, pending.PrimarySessionId); + return; + } + // PrimarySessionId is the full WM session namespace (e.g. "session/blazor-session"). // Strip the prefix for conversation memory and context builder consistency. var sessionNamespace = pending.PrimarySessionId; diff --git a/src/RockBot.A2A/A2ATaskResultHandler.cs b/src/RockBot.A2A/A2ATaskResultHandler.cs index 58c90e4a..aef9acee 100644 --- a/src/RockBot.A2A/A2ATaskResultHandler.cs +++ b/src/RockBot.A2A/A2ATaskResultHandler.cs @@ -43,6 +43,18 @@ internal sealed class A2ATaskResultHandler( { private string DisplayName => agentNameHolder.DisplayName ?? agent.Name; + /// + /// True only when an A2A task originated from the primary agent's user session + /// (e.g. "session/blazor-session"). Subagent sessions ("session/subagent-...") and + /// transient executor sessions ("wisp-...") are not user-facing — A2A results + /// directed at them must flow back to the calling loop via working memory rather + /// than producing their own user-visible chat bubble. Only the primary agent + /// communicates with the user. + /// + private static bool IsUserSession(string primarySessionId) => + primarySessionId.StartsWith("session/", StringComparison.OrdinalIgnoreCase) && + !primarySessionId.StartsWith("session/subagent-", StringComparison.OrdinalIgnoreCase); + public async Task HandleAsync(AgentTaskResult result, MessageHandlerContext context) { var ct = context.CancellationToken; @@ -234,6 +246,20 @@ await workingMemory.SetAsync( "A2A result for task {TaskId} ({Len:N0} chars) stored in working memory at key '{Key}'", result.TaskId, resultText.Length, memoryKey); + // If the A2A invocation didn't originate in a user session, the result must flow + // back to the calling loop (subagent or wisp) via working memory — not as a + // user-visible chat bubble. The caller will pull the result and incorporate it + // into its own output, which is the only thing the user should see. Skip the + // synthesis + publish entirely for those callers. + if (!IsUserSession(pending.PrimarySessionId)) + { + logger.LogInformation( + "A2A task {TaskId} originated from non-user session {SessionId} — skipping " + + "synthesis and bubble publish (caller will consume the working-memory result)", + result.TaskId, pending.PrimarySessionId); + return; + } + syntheticUserTurn = $"[Agent '{pending.TargetAgent}' completed task {result.TaskId} (state={result.State})]: " + $"The result ({resultText.Length:N0} chars) is in working memory. " + @@ -306,6 +332,19 @@ await conversationMemory.AddTurnAsync( private async Task PublishErrorToUserAsync( PendingA2ATask pending, string taskId, string errorMessage, CancellationToken ct) { + // Only the primary agent talks to the user. When the failed A2A invocation came + // from a subagent or wisp, surface the error through that caller's normal output + // path (it sees the failure via working memory / its loop's exception handling) + // rather than emitting our own bubble. + if (!IsUserSession(pending.PrimarySessionId)) + { + logger.LogInformation( + "Suppressing A2A error bubble for task {TaskId} — invocation came from non-user " + + "session {SessionId}; caller will surface the error in its own output", + taskId, pending.PrimarySessionId); + return; + } + var sessionNamespace = pending.PrimarySessionId; const string SessionPrefix = "session/"; var rawSessionId = sessionNamespace.StartsWith(SessionPrefix, StringComparison.OrdinalIgnoreCase) diff --git a/src/RockBot.Agent/UserMessageHandler.cs b/src/RockBot.Agent/UserMessageHandler.cs index a60fc06d..38dc7b85 100644 --- a/src/RockBot.Agent/UserMessageHandler.cs +++ b/src/RockBot.Agent/UserMessageHandler.cs @@ -452,6 +452,10 @@ await conversationMemory.AddTurnAsync( { AgentName = agent.Name }, ct); + // The parent reply is the user's "I'm starting work" / direct answer bubble + // and resolves the user-proxy SendAsync TCS. If the loop spawned consolidating + // subagents, their Phase 2 synthesis will arrive later as a separate + // unsolicited final bubble — that's the consolidated answer. await PublishReplyAsync(text, replyTo, correlationId, sessionId, isFinal: true, ct); loopSw.Stop(); turnActivity?.SetTag("rockbot.turn.status", "ok"); diff --git a/src/RockBot.Host/AgentHost.cs b/src/RockBot.Host/AgentHost.cs index 2d488103..004796f4 100644 --- a/src/RockBot.Host/AgentHost.cs +++ b/src/RockBot.Host/AgentHost.cs @@ -45,19 +45,22 @@ public async Task StartAsync(CancellationToken cancellationToken) await RecoverWipEntriesAsync(cancellationToken); - foreach (var topic in _options.Topics) + foreach (var sub in _options.Topics) { - var sanitizedTopic = topic.Replace(".", "-").Replace("*", "_").Replace("#", "__"); + var sanitizedTopic = sub.Topic.Replace(".", "-").Replace("*", "_").Replace("#", "__"); var subscriptionName = $"{_identity.Name}.{sanitizedTopic}"; var subscription = await _subscriber.SubscribeAsync( - topic, + sub.Topic, subscriptionName, (envelope, ct) => _pipeline.DispatchAsync(envelope, ct), - cancellationToken); + cancellationToken, + sub.DispatchConcurrency); _subscriptions.Add(subscription); - _logger.LogInformation("Subscribed to {Topic} as {SubscriptionName}", topic, subscriptionName); + _logger.LogInformation( + "Subscribed to {Topic} as {SubscriptionName} (dispatchConcurrency={Concurrency})", + sub.Topic, subscriptionName, sub.DispatchConcurrency); } } diff --git a/src/RockBot.Host/AgentHostBuilder.cs b/src/RockBot.Host/AgentHostBuilder.cs index 918e5163..b1e00480 100644 --- a/src/RockBot.Host/AgentHostBuilder.cs +++ b/src/RockBot.Host/AgentHostBuilder.cs @@ -40,11 +40,14 @@ public AgentHostBuilder WithIdentity(string name, string? instanceId = null) } /// - /// Subscribe to a topic. + /// Subscribe to a topic. Optionally specify + /// to allow concurrent handler invocations for this subscription. Default 1 + /// (sequential dispatch, preserves message ordering). Only bump when the handler + /// is re-entrant and may block on cross-message coordination. /// - public AgentHostBuilder SubscribeTo(string topic) + public AgentHostBuilder SubscribeTo(string topic, int dispatchConcurrency = 1) { - _options.Topics.Add(topic); + _options.Topics.Add(new TopicSubscription(topic, dispatchConcurrency)); return this; } @@ -77,8 +80,8 @@ internal void Build() _services.AddSingleton(); _services.Configure(opts => { - foreach (var topic in _options.Topics) - opts.Topics.Add(topic); + foreach (var sub in _options.Topics) + opts.Topics.Add(sub); }); } } diff --git a/src/RockBot.Host/AgentHostOptions.cs b/src/RockBot.Host/AgentHostOptions.cs index 4d4e011b..91bdebfc 100644 --- a/src/RockBot.Host/AgentHostOptions.cs +++ b/src/RockBot.Host/AgentHostOptions.cs @@ -1,14 +1,24 @@ namespace RockBot.Host; +/// +/// A topic subscription paired with its dispatch concurrency. +/// +/// The topic pattern (with wildcards). +/// Maximum concurrent in-flight handler invocations +/// for this subscription. Default 1 (sequential, preserves ordering). Bump only for +/// re-entrant handlers where cross-message coordination would otherwise deadlock the +/// consumer (e.g. the subagent-result consolidation gate). +public sealed record TopicSubscription(string Topic, int DispatchConcurrency = 1); + /// /// Configuration options for the agent host. /// public sealed class AgentHostOptions { /// - /// Topics the agent subscribes to. + /// Topics the agent subscribes to, paired with their dispatch concurrency. /// - public List Topics { get; } = []; + public List Topics { get; } = []; /// /// Default maximum number of tool-calling round-trips per request. diff --git a/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs b/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs index 299fcbdb..818f6519 100644 --- a/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs +++ b/src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs @@ -16,10 +16,19 @@ public interface IMessageSubscriber : IAsyncDisposable /// used to create a durable consumer group/queue. /// Async handler invoked for each message. /// Cancellation token. + /// Maximum number of messages from this + /// subscription that may be processed concurrently. Default 1 (sequential + /// processing — preserves message ordering). Bump higher when the handler + /// is re-entrant and may block on cross-message coordination (e.g. a + /// consolidation gate that waits for siblings). Providers that don't + /// support per-subscription concurrency may ignore this hint. Placed after + /// so positional callers that pass + /// only a CT keep working unchanged. /// A subscription handle that can be disposed to unsubscribe. Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default); + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1); } diff --git a/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs b/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs index d300e36d..93f148ff 100644 --- a/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs +++ b/src/RockBot.Messaging.InProcess/InProcessSubscriber.cs @@ -18,8 +18,12 @@ public Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { + // dispatchConcurrency is a transport hint; the in-process bus already invokes + // each subscription handler asynchronously per publish call, so there's no + // separate dispatch loop to widen. var subscription = new InProcessSubscription(topic, subscriptionName, handler, _bus, _logger); _bus.Register(subscription); return Task.FromResult(subscription); diff --git a/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs b/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs index 521a19a5..0f80d72d 100644 --- a/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs +++ b/src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs @@ -29,10 +29,30 @@ public RabbitMqConnectionManager( /// Each caller gets its own channel (publishers and subscribers should /// not share channels in RabbitMQ). /// - public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + /// When non-null and greater than 1, + /// configures the channel to dispatch consumer events concurrently. Used by + /// subscriber channels whose handlers must run in parallel (e.g. subagent + /// result accumulation). Null falls back to the connection-level default. + public async Task CreateChannelAsync( + CancellationToken cancellationToken = default, + ushort? consumerDispatchConcurrency = null) { var connection = await GetConnectionAsync(cancellationToken); - var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + + IChannel channel; + if (consumerDispatchConcurrency is { } concurrency && concurrency > 1) + { + var options = new CreateChannelOptions( + publisherConfirmationsEnabled: false, + publisherConfirmationTrackingEnabled: false, + outstandingPublisherConfirmationsRateLimiter: null, + consumerDispatchConcurrency: concurrency); + channel = await connection.CreateChannelAsync(options, cancellationToken); + } + else + { + channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + } // Declare the topic exchange await channel.ExchangeDeclareAsync( diff --git a/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs b/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs index 94e7069e..41857820 100644 --- a/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs +++ b/src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs @@ -17,6 +17,13 @@ public sealed class RabbitMqPublisher : IMessagePublisher private readonly ILogger _logger; private IChannel? _channel; private readonly SemaphoreSlim _channelLock = new(1, 1); + + // RabbitMQ.Client v7 IChannel is NOT thread-safe for publishing — concurrent + // BasicPublishAsync calls on the same channel can interleave AMQP frames and + // corrupt the wire protocol. Serialize publishes through this semaphore so the + // singleton publisher remains safe even when consumers (e.g. subagent results + // with raised dispatch concurrency) trigger parallel publish paths. + private readonly SemaphoreSlim _publishLock = new(1, 1); private bool _disposed; public RabbitMqPublisher( @@ -79,13 +86,21 @@ public async Task PublishAsync( "Publishing message {MessageId} to topic {Topic} (type: {Type})", envelope.MessageId, topic, envelope.MessageType); - await channel.BasicPublishAsync( - exchange: _options.ExchangeName, - routingKey: topic, - mandatory: false, - basicProperties: properties, - body: envelope.Body, - cancellationToken: cancellationToken); + await _publishLock.WaitAsync(cancellationToken); + try + { + await channel.BasicPublishAsync( + exchange: _options.ExchangeName, + routingKey: topic, + mandatory: false, + basicProperties: properties, + body: envelope.Body, + cancellationToken: cancellationToken); + } + finally + { + _publishLock.Release(); + } sw.Stop(); RabbitMqDiagnostics.PublishDuration.Record(sw.Elapsed.TotalMilliseconds, @@ -132,5 +147,6 @@ public async ValueTask DisposeAsync() await _channel.CloseAsync(); _channelLock.Dispose(); + _publishLock.Dispose(); } } diff --git a/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs b/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs index cd25ccab..62d7f08e 100644 --- a/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs +++ b/src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs @@ -33,7 +33,8 @@ public async Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { var queueName = $"rockbot.{subscriptionName}"; var dlqName = $"{queueName}.dlq"; @@ -41,13 +42,17 @@ public async Task SubscribeAsync( var dlxName = _options.DeadLetterExchangeName; var prefetchCount = _options.PrefetchCount; var durable = _options.Durable; + // Translate the abstraction's dispatchConcurrency hint into a channel-level + // ConsumerDispatchConcurrency. Values <=1 leave it unset so we keep the + // connection-level default and the channel processes deliveries sequentially. + var channelConcurrency = dispatchConcurrency > 1 ? (ushort?)dispatchConcurrency : null; // Factory that creates a fresh channel + consumer, called both for initial // setup and for transparent reconnection after unexpected channel closure. async Task<(IChannel channel, string consumerTag)> CreateChannelAndConsumerAsync( CancellationToken ct) { - var channel = await _connectionManager.CreateChannelAsync(ct); + var channel = await _connectionManager.CreateChannelAsync(ct, channelConcurrency); await channel.BasicQosAsync( prefetchSize: 0, @@ -97,8 +102,9 @@ await channel.QueueDeclareAsync( "Queue {Queue} has stale arguments — deleting and recreating: {Reason}", queueName, ex.ShutdownReason.ReplyText); - // The original channel is dead after a 406; open a new one. - channel = await _connectionManager.CreateChannelAsync(ct); + // The original channel is dead after a 406; open a new one with the + // same concurrency setting as the original. + channel = await _connectionManager.CreateChannelAsync(ct, channelConcurrency); await channel.BasicQosAsync(0, prefetchCount, false, cancellationToken: ct); await channel.QueueDeleteAsync(queueName, cancellationToken: ct); diff --git a/src/RockBot.Subagent/SubagentResultGate.cs b/src/RockBot.Subagent/SubagentResultGate.cs index b0e95b5b..f940d8f7 100644 --- a/src/RockBot.Subagent/SubagentResultGate.cs +++ b/src/RockBot.Subagent/SubagentResultGate.cs @@ -24,6 +24,14 @@ internal sealed class SubagentResultGate( // result before returning, but message dispatch through RabbitMQ is async. private static readonly TimeSpan CancellationGrace = TimeSpan.FromSeconds(5); + // Extra slack added on top of the consolidation ceiling for the late-arrival window + // and post-fire batch retention. Phase 2 synthesis can run for tens of seconds, and + // a stragger result message may sit in the broker through that whole time before + // being delivered to a fresh handler invocation. We must keep the fired batch around + // long enough for that late arrival to find it and short-circuit (return null) + // instead of starting a brand-new batch that fires its own duplicate synthesis. + private static readonly TimeSpan PostFireRetentionSlack = TimeSpan.FromMinutes(2); + /// /// Accumulates a subagent result into its batch. Returns: /// @@ -44,9 +52,16 @@ internal sealed class SubagentResultGate( var batch = _pending.GetOrAdd(batchKey, _ => new PendingBatch()); - // If a stale batch (already fired > 30s ago), replace it + // If a fired batch is "stale enough" to be safely replaced — i.e. older than + // (consolidation ceiling + slack) — start a fresh one. Within that window we + // assume any incoming result with this batchId is a delayed sibling of the + // already-fired batch, not a brand-new batch with a coincidentally reused key. + // BatchIds are 12-char GUID prefixes per primary session, so true reuse is + // astronomically unlikely; the staleness check only protects against pathological + // edge cases (e.g. clock skew on resume after suspend). + var staleThreshold = ChooseCeiling(result.PrimarySessionId) + PostFireRetentionSlack; if (batch.Fired && batch.FiredAt is { } firedAt - && DateTimeOffset.UtcNow - firedAt > TimeSpan.FromSeconds(30)) + && DateTimeOffset.UtcNow - firedAt > staleThreshold) { var fresh = new PendingBatch(); if (_pending.TryUpdate(batchKey, fresh, batch)) @@ -95,7 +110,7 @@ internal sealed class SubagentResultGate( { logger.LogInformation( "Batch {BatchKey} fired with {Count} result(s)", batchKey, fired.Count); - CleanupBatch(batchKey); + CleanupBatch(batchKey, result.PrimarySessionId); return fired; } return null; // someone else won the race @@ -188,7 +203,7 @@ internal sealed class SubagentResultGate( logger.LogInformation( "Batch {BatchKey} fired at ceiling with {Count} result(s) ({Cancelled} cancelled)", batchKey, ceilingFired.Count, stragglers.Count); - CleanupBatch(batchKey); + CleanupBatch(batchKey, result.PrimarySessionId); return ceilingFired; } @@ -216,10 +231,15 @@ private TimeSpan ChooseCeiling(string primarySessionId) } } - private void CleanupBatch(string batchKey) + private void CleanupBatch(string batchKey, string primarySessionId) { - // Don't remove immediately — keep for late-arrival detection (30s staleness window) - _ = Task.Delay(TimeSpan.FromSeconds(35)).ContinueWith(_ => _pending.TryRemove(batchKey, out PendingBatch? _)); + // Keep the fired batch around for at least (consolidation ceiling + slack) so + // late-arriving sibling results find it and return null (deferred-to-winner) + // instead of provoking a fresh PendingBatch with a duplicate synthesis. Must + // outlive both the staleness window in AccumulateAsync and any plausible + // Phase 2 synthesis time. + var retention = ChooseCeiling(primarySessionId) + PostFireRetentionSlack + TimeSpan.FromSeconds(5); + _ = Task.Delay(retention).ContinueWith(_ => _pending.TryRemove(batchKey, out PendingBatch? _)); } private sealed class PendingBatch diff --git a/src/RockBot.Subagent/SubagentResultHandler.cs b/src/RockBot.Subagent/SubagentResultHandler.cs index 5df9e2f9..fe7d537f 100644 --- a/src/RockBot.Subagent/SubagentResultHandler.cs +++ b/src/RockBot.Subagent/SubagentResultHandler.cs @@ -208,6 +208,12 @@ await conversationMemory.AddTurnAsync( { AgentName = agent.Name }, ct); + // The synthesis is the consolidated answer bubble. The parent loop already + // emitted its own final reply (with the original user correlationId) before + // spawning subagents, so this synthesis arrives as an unsolicited final + // reply and renders as a separate chat bubble in the UI — the user sees + // the parent's "I'll delegate…" announcement followed by this consolidated + // answer once all subagents complete. var reply = new AgentReply { Content = finalContent, diff --git a/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs b/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs index 146ed55b..15fc4a5a 100644 --- a/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs +++ b/src/RockBot.Subagent/SubagentServiceCollectionExtensions.cs @@ -16,6 +16,12 @@ public static AgentHostBuilder AddSubagents( this AgentHostBuilder builder, Action? configure = null) { + // Build a snapshot of the options now so we can read MaxConcurrentSubagents + // when sizing the result-topic dispatch concurrency below. The configured + // delegate still runs at DI build time for the runtime-resolved options. + var optionsSnapshot = new SubagentOptions(); + configure?.Invoke(optionsSnapshot); + if (configure is not null) builder.Services.Configure(configure); else @@ -31,7 +37,16 @@ public static AgentHostBuilder AddSubagents( builder.HandleMessage(); var agentName = builder.Identity.Name; builder.SubscribeTo($"{SubagentTopics.Progress}.{agentName}"); - builder.SubscribeTo($"{SubagentTopics.Result}.{agentName}"); + + // Result topic must allow concurrent dispatch so sibling SubagentResultHandler + // invocations can each enter SubagentResultGate.AccumulateAsync simultaneously. + // Without this, the first handler's wait-for-siblings loop blocks the channel + // and queued sibling results are delivered serially after each Phase 2 finishes, + // producing one solo synthesis (and one final UI bubble) per subagent instead + // of one consolidated synthesis per batch. + // Size: MaxConcurrentSubagents (one slot per potential sibling) + 1 buffer. + var resultDispatchConcurrency = Math.Max(2, optionsSnapshot.MaxConcurrentSubagents + 1); + builder.SubscribeTo($"{SubagentTopics.Result}.{agentName}", resultDispatchConcurrency); // Tool registrar (registers spawn_subagent, cancel_subagent, list_subagents) builder.Services.AddHostedService(); diff --git a/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs b/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs index 839495e5..020919b7 100644 --- a/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs +++ b/tests/RockBot.A2A.Tests/A2ARegistrationTests.cs @@ -78,8 +78,8 @@ public void AddA2A_RegistersTopicSubscriptions() var provider = services.BuildServiceProvider(); var options = provider.GetRequiredService>(); - Assert.IsTrue(options.Value.Topics.Contains("agent.task.my-agent")); - Assert.IsTrue(options.Value.Topics.Contains("agent.task.cancel.my-agent")); + Assert.IsTrue(options.Value.Topics.Any(t => t.Topic == "agent.task.my-agent")); + Assert.IsTrue(options.Value.Topics.Any(t => t.Topic == "agent.task.cancel.my-agent")); } [TestMethod] @@ -109,7 +109,8 @@ private sealed class StubSubscriber : IMessageSubscriber public Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { return Task.FromResult(new StubSubscription()); } diff --git a/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs b/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs index da30b4e5..fa95a5a0 100644 --- a/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs +++ b/tests/RockBot.A2A.Tests/A2ASkillHandlerRegistrationTests.cs @@ -242,7 +242,8 @@ private sealed class StubSubscriber : IMessageSubscriber public Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { return Task.FromResult(new StubSubscription()); } diff --git a/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs b/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs index a1c11084..f1199007 100644 --- a/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs +++ b/tests/RockBot.Host.Tests/AgentHostBuilderTests.cs @@ -51,8 +51,27 @@ public void SubscribeTo_AddsTopics() var options = provider.GetRequiredService>().Value; Assert.AreEqual(2, options.Topics.Count); - CollectionAssert.Contains(options.Topics, "agent.task.*"); - CollectionAssert.Contains(options.Topics, "llm.response"); + Assert.IsTrue(options.Topics.Any(t => t.Topic == "agent.task.*")); + Assert.IsTrue(options.Topics.Any(t => t.Topic == "llm.response")); + Assert.IsTrue(options.Topics.All(t => t.DispatchConcurrency == 1), + "Topics added without explicit concurrency should default to 1."); + } + + [TestMethod] + public void SubscribeTo_RecordsDispatchConcurrency() + { + var services = new ServiceCollection(); + services.AddLogging(); + services.AddRockBotHost(agent => agent + .WithIdentity("my-agent") + .SubscribeTo("subagent.result.*", dispatchConcurrency: 4)); + + var provider = services.BuildServiceProvider(); + var options = provider.GetRequiredService>().Value; + + Assert.AreEqual(1, options.Topics.Count); + Assert.AreEqual("subagent.result.*", options.Topics[0].Topic); + Assert.AreEqual(4, options.Topics[0].DispatchConcurrency); } [TestMethod] diff --git a/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs b/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs index aa83a47c..4839b2f3 100644 --- a/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs +++ b/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs @@ -92,7 +92,8 @@ private sealed class StubSubscriber : IMessageSubscriber { public Task SubscribeAsync(string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) => Task.FromResult(new StubSubscription(topic, subscriptionName)); public ValueTask DisposeAsync() => ValueTask.CompletedTask; diff --git a/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs b/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs index d232d91b..775ec6ea 100644 --- a/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs +++ b/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs @@ -74,7 +74,8 @@ private sealed class StubSubscriber : IMessageSubscriber { public Task SubscribeAsync(string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) => Task.FromResult(new StubSubscription(topic, subscriptionName)); public ValueTask DisposeAsync() => ValueTask.CompletedTask; diff --git a/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs b/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs index f7f9d20a..1b2170e6 100644 --- a/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs +++ b/tests/RockBot.Subagent.Tests/SubagentResultGateTests.cs @@ -227,6 +227,72 @@ public async Task AccumulateAsync_LateArrival_AfterFired_ReturnsNull() Assert.IsNull(batch2, "Late arrival to a fired batch must not trigger another synthesis"); } + // ── Late arrival inside ceiling+slack must return null even if Phase 2 was slow ─ + + [TestMethod] + public async Task AccumulateAsync_LateArrival_WithinCeilingPlusSlack_ReturnsNull() + { + // Background ceiling = 3 s. With +2 min slack the gate keeps a fired batch + // discoverable for ~2 min 3 s. A sibling arriving 1 s after fire (well within + // that window) must short-circuit to null, matching the production scenario + // where Phase 2 synthesis takes longer than the legacy 30 s staleness check. + var gate = CreateGate(backgroundCeilingSec: 3); + var result1 = MakeResult("task-1", primarySessionId: "patrol/morning-brief"); + + var batch1 = await gate.AccumulateAsync(result1, new FakeSubagentManager([]), CancellationToken.None); + Assert.IsNotNull(batch1); + + // Simulate the time a Phase 2 synthesis would consume before the next + // sibling result is dequeued from the broker. + await Task.Delay(TimeSpan.FromSeconds(1)); + + var result2 = MakeResult("task-2", primarySessionId: "patrol/morning-brief"); + var batch2 = await gate.AccumulateAsync(result2, new FakeSubagentManager([]), CancellationToken.None); + + Assert.IsNull(batch2, + "Late arrival within ceiling+slack must defer to the already-fired winner, " + + "not start a fresh batch that produces a duplicate Phase 2 synthesis."); + } + + // ── Concurrent arrivals accumulate into a single batch ──────────────────── + + [TestMethod] + public async Task AccumulateAsync_ConcurrentArrivals_AccumulateIntoOneBatch() + { + // Models the post-fix runtime: with raised consumer dispatch concurrency, all + // sibling results enter AccumulateAsync simultaneously (instead of arriving + // serially after each Phase 2). They must coalesce into one batch with one + // winner, not one batch per result. + var gate = CreateGate(interactiveCeilingSec: 30); + var primarySessionId = "session/concurrent-1"; + + var entries = Enumerable.Range(1, 3) + .Select(i => MakeActiveEntry($"task-{i}", TimeSpan.FromSeconds(20), primarySessionId)) + .ToList(); + + // Start all three handlers in parallel. Each sees the others as active siblings + // initially; the manager mutates as each task "completes" (its entry is removed). + var manager = new FakeMutableSubagentManager(entries); + var results = new[] { "task-1", "task-2", "task-3" } + .Select(id => MakeResult(id, primarySessionId)).ToArray(); + + var tasks = results.Select(r => Task.Run(async () => + { + // Stagger entries' removal so the wait loop has something to detect. + await Task.Delay(50); + manager.MarkComplete(r.TaskId); + return await gate.AccumulateAsync(r, manager, CancellationToken.None); + })).ToArray(); + + var outcomes = await Task.WhenAll(tasks); + + var winners = outcomes.Where(o => o is not null).ToList(); + Assert.AreEqual(1, winners.Count, "Exactly one handler should be the synthesis winner."); + Assert.AreEqual(3, winners[0]!.Count, "Winner's batch must contain all three sibling results."); + Assert.IsTrue(winners[0]!.Select(r => r.TaskId).OrderBy(x => x) + .SequenceEqual(new[] { "task-1", "task-2", "task-3" })); + } + // ── Duplicate result (already fired) returns null ─────────────────────── [TestMethod] @@ -286,4 +352,34 @@ public Task CancelAsync(string taskId) public IReadOnlyList ListActive() => _active.ToList(); } + + private sealed class FakeMutableSubagentManager(IReadOnlyList activeEntries) : ISubagentManager + { + private readonly object _lock = new(); + private readonly List _active = activeEntries.ToList(); + + public Task SpawnAsync(string description, string? context, int? timeoutMinutes, + string primarySessionId, CancellationToken ct, + string? batchId = null, bool consolidate = true, int? maxIterations = null) => + Task.FromResult("fake-task-id"); + + public Task CancelAsync(string taskId) + { + lock (_lock) + { + var removed = _active.RemoveAll(e => e.TaskId == taskId) > 0; + return Task.FromResult(removed); + } + } + + public IReadOnlyList ListActive() + { + lock (_lock) return _active.ToList(); + } + + public void MarkComplete(string taskId) + { + lock (_lock) _active.RemoveAll(e => e.TaskId == taskId); + } + } } diff --git a/tests/RockBot.Tools.Tests/TestHelpers.cs b/tests/RockBot.Tools.Tests/TestHelpers.cs index 2c99b780..c5c3b208 100644 --- a/tests/RockBot.Tools.Tests/TestHelpers.cs +++ b/tests/RockBot.Tools.Tests/TestHelpers.cs @@ -58,7 +58,8 @@ public Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { Subscriptions.Add((topic, subscriptionName, handler)); ISubscription sub = new StubSubscription(topic, subscriptionName); diff --git a/tests/RockBot.UserProxy.Tests/TestHelpers.cs b/tests/RockBot.UserProxy.Tests/TestHelpers.cs index 6b0444cf..15475eae 100644 --- a/tests/RockBot.UserProxy.Tests/TestHelpers.cs +++ b/tests/RockBot.UserProxy.Tests/TestHelpers.cs @@ -44,7 +44,8 @@ public Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { _subscriptions.Add((topic, subscriptionName, handler)); return Task.FromResult(new StubSubscription(topic, subscriptionName)); @@ -111,7 +112,8 @@ public Task SubscribeAsync( string topic, string subscriptionName, Func> handler, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int dispatchConcurrency = 1) { var call = Interlocked.Increment(ref _callCount);