Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<!-- Version can be overridden from the command line: -p:Version=0.3.1
AssemblyVersion and FileVersion are derived automatically by the SDK
(prerelease suffixes like -beta001 are stripped for assembly versions). -->
<Version>0.10.30</Version>
<Version>0.10.34</Version>
</PropertyGroup>

<!-- NuGet package metadata (shared across all packable projects) -->
Expand Down
22 changes: 22 additions & 0 deletions src/RockBot.A2A/A2ATaskErrorHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ internal sealed class A2ATaskErrorHandler(
{
private string DisplayName => agentNameHolder.DisplayName ?? agent.Name;

/// <summary>
/// True only when an A2A task originated from the primary agent's user session.
/// Subagent and wisp sessions are not user-facing; their A2A errors flow back to
/// the calling loop rather than producing user bubbles.
/// </summary>
private static bool IsUserSession(string primarySessionId) =>
primarySessionId.StartsWith("session/", StringComparison.OrdinalIgnoreCase) &&
!primarySessionId.StartsWith("session/subagent-", StringComparison.OrdinalIgnoreCase);

public async Task HandleAsync(AgentTaskError error, MessageHandlerContext context)
{
var ct = context.CancellationToken;
Expand All @@ -58,6 +67,19 @@ public async Task HandleAsync(AgentTaskError error, MessageHandlerContext contex
"A2A task error for task {TaskId} from agent '{TargetAgent}' in session {SessionId}: [{Code}] {Message}",
error.TaskId, pending.TargetAgent, pending.PrimarySessionId, error.Code, error.Message);

// Only the primary agent talks to the user. When the failed A2A invocation came
// from a subagent or wisp, the calling loop will surface the failure in its own
// output — emitting a separate user-facing bubble here would bypass the primary
// agent and show transport-layer noise to the user.
if (!IsUserSession(pending.PrimarySessionId))
{
logger.LogInformation(
"A2A task error for {TaskId} originated from non-user session {SessionId} — " +
"skipping synthesis and bubble publish (caller will surface the error)",
error.TaskId, pending.PrimarySessionId);
return;
}

// PrimarySessionId is the full WM session namespace (e.g. "session/blazor-session").
// Strip the prefix for conversation memory and context builder consistency.
var sessionNamespace = pending.PrimarySessionId;
Expand Down
39 changes: 39 additions & 0 deletions src/RockBot.A2A/A2ATaskResultHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ internal sealed class A2ATaskResultHandler(
{
private string DisplayName => agentNameHolder.DisplayName ?? agent.Name;

/// <summary>
/// True only when an A2A task originated from the primary agent's user session
/// (e.g. "session/blazor-session"). Subagent sessions ("session/subagent-...") and
/// transient executor sessions ("wisp-...") are not user-facing — A2A results
/// directed at them must flow back to the calling loop via working memory rather
/// than producing their own user-visible chat bubble. Only the primary agent
/// communicates with the user.
/// </summary>
private static bool IsUserSession(string primarySessionId) =>
primarySessionId.StartsWith("session/", StringComparison.OrdinalIgnoreCase) &&
!primarySessionId.StartsWith("session/subagent-", StringComparison.OrdinalIgnoreCase);

public async Task HandleAsync(AgentTaskResult result, MessageHandlerContext context)
{
var ct = context.CancellationToken;
Expand Down Expand Up @@ -234,6 +246,20 @@ await workingMemory.SetAsync(
"A2A result for task {TaskId} ({Len:N0} chars) stored in working memory at key '{Key}'",
result.TaskId, resultText.Length, memoryKey);

// If the A2A invocation didn't originate in a user session, the result must flow
// back to the calling loop (subagent or wisp) via working memory — not as a
// user-visible chat bubble. The caller will pull the result and incorporate it
// into its own output, which is the only thing the user should see. Skip the
// synthesis + publish entirely for those callers.
if (!IsUserSession(pending.PrimarySessionId))
{
logger.LogInformation(
"A2A task {TaskId} originated from non-user session {SessionId} — skipping " +
"synthesis and bubble publish (caller will consume the working-memory result)",
result.TaskId, pending.PrimarySessionId);
return;
}

syntheticUserTurn =
$"[Agent '{pending.TargetAgent}' completed task {result.TaskId} (state={result.State})]: " +
$"The result ({resultText.Length:N0} chars) is in working memory. " +
Expand Down Expand Up @@ -306,6 +332,19 @@ await conversationMemory.AddTurnAsync(
private async Task PublishErrorToUserAsync(
PendingA2ATask pending, string taskId, string errorMessage, CancellationToken ct)
{
// Only the primary agent talks to the user. When the failed A2A invocation came
// from a subagent or wisp, surface the error through that caller's normal output
// path (it sees the failure via working memory / its loop's exception handling)
// rather than emitting our own bubble.
if (!IsUserSession(pending.PrimarySessionId))
{
logger.LogInformation(
"Suppressing A2A error bubble for task {TaskId} — invocation came from non-user " +
"session {SessionId}; caller will surface the error in its own output",
taskId, pending.PrimarySessionId);
return;
}

var sessionNamespace = pending.PrimarySessionId;
const string SessionPrefix = "session/";
var rawSessionId = sessionNamespace.StartsWith(SessionPrefix, StringComparison.OrdinalIgnoreCase)
Expand Down
4 changes: 4 additions & 0 deletions src/RockBot.Agent/UserMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ await conversationMemory.AddTurnAsync(
{ AgentName = agent.Name },
ct);

// The parent reply is the user's "I'm starting work" / direct answer bubble
// and resolves the user-proxy SendAsync TCS. If the loop spawned consolidating
// subagents, their Phase 2 synthesis will arrive later as a separate
// unsolicited final bubble — that's the consolidated answer.
await PublishReplyAsync(text, replyTo, correlationId, sessionId, isFinal: true, ct);
loopSw.Stop();
turnActivity?.SetTag("rockbot.turn.status", "ok");
Expand Down
13 changes: 8 additions & 5 deletions src/RockBot.Host/AgentHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@ public async Task StartAsync(CancellationToken cancellationToken)

await RecoverWipEntriesAsync(cancellationToken);

foreach (var topic in _options.Topics)
foreach (var sub in _options.Topics)
{
var sanitizedTopic = topic.Replace(".", "-").Replace("*", "_").Replace("#", "__");
var sanitizedTopic = sub.Topic.Replace(".", "-").Replace("*", "_").Replace("#", "__");
var subscriptionName = $"{_identity.Name}.{sanitizedTopic}";

var subscription = await _subscriber.SubscribeAsync(
topic,
sub.Topic,
subscriptionName,
(envelope, ct) => _pipeline.DispatchAsync(envelope, ct),
cancellationToken);
cancellationToken,
sub.DispatchConcurrency);

_subscriptions.Add(subscription);
_logger.LogInformation("Subscribed to {Topic} as {SubscriptionName}", topic, subscriptionName);
_logger.LogInformation(
"Subscribed to {Topic} as {SubscriptionName} (dispatchConcurrency={Concurrency})",
sub.Topic, subscriptionName, sub.DispatchConcurrency);
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/RockBot.Host/AgentHostBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ public AgentHostBuilder WithIdentity(string name, string? instanceId = null)
}

/// <summary>
/// Subscribe to a topic.
/// Subscribe to a topic. Optionally specify <paramref name="dispatchConcurrency"/>
/// to allow concurrent handler invocations for this subscription. Default 1
/// (sequential dispatch, preserves message ordering). Only bump when the handler
/// is re-entrant and may block on cross-message coordination.
/// </summary>
public AgentHostBuilder SubscribeTo(string topic)
public AgentHostBuilder SubscribeTo(string topic, int dispatchConcurrency = 1)
{
_options.Topics.Add(topic);
_options.Topics.Add(new TopicSubscription(topic, dispatchConcurrency));
return this;
}

Expand Down Expand Up @@ -77,8 +80,8 @@ internal void Build()
_services.AddSingleton<AgentClock>();
_services.Configure<AgentHostOptions>(opts =>
{
foreach (var topic in _options.Topics)
opts.Topics.Add(topic);
foreach (var sub in _options.Topics)
opts.Topics.Add(sub);
});
}
}
14 changes: 12 additions & 2 deletions src/RockBot.Host/AgentHostOptions.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
namespace RockBot.Host;

/// <summary>
/// A topic subscription paired with its dispatch concurrency.
/// </summary>
/// <param name="Topic">The topic pattern (with wildcards).</param>
/// <param name="DispatchConcurrency">Maximum concurrent in-flight handler invocations
/// for this subscription. Default 1 (sequential, preserves ordering). Bump only for
/// re-entrant handlers where cross-message coordination would otherwise deadlock the
/// consumer (e.g. the subagent-result consolidation gate).</param>
public sealed record TopicSubscription(string Topic, int DispatchConcurrency = 1);

/// <summary>
/// Configuration options for the agent host.
/// </summary>
public sealed class AgentHostOptions
{
/// <summary>
/// Topics the agent subscribes to.
/// Topics the agent subscribes to, paired with their dispatch concurrency.
/// </summary>
public List<string> Topics { get; } = [];
public List<TopicSubscription> Topics { get; } = [];

/// <summary>
/// Default maximum number of tool-calling round-trips per request.
Expand Down
11 changes: 10 additions & 1 deletion src/RockBot.Messaging.Abstractions/IMessageSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@ public interface IMessageSubscriber : IAsyncDisposable
/// used to create a durable consumer group/queue.</param>
/// <param name="handler">Async handler invoked for each message.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <param name="dispatchConcurrency">Maximum number of messages from this
/// subscription that may be processed concurrently. Default 1 (sequential
/// processing — preserves message ordering). Bump higher when the handler
/// is re-entrant and may block on cross-message coordination (e.g. a
/// consolidation gate that waits for siblings). Providers that don't
/// support per-subscription concurrency may ignore this hint. Placed after
/// <paramref name="cancellationToken"/> so positional callers that pass
/// only a CT keep working unchanged.</param>
/// <returns>A subscription handle that can be disposed to unsubscribe.</returns>
Task<ISubscription> SubscribeAsync(
string topic,
string subscriptionName,
Func<MessageEnvelope, CancellationToken, Task<MessageResult>> handler,
CancellationToken cancellationToken = default);
CancellationToken cancellationToken = default,
int dispatchConcurrency = 1);
}
6 changes: 5 additions & 1 deletion src/RockBot.Messaging.InProcess/InProcessSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ public Task<ISubscription> SubscribeAsync(
string topic,
string subscriptionName,
Func<MessageEnvelope, CancellationToken, Task<MessageResult>> handler,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
int dispatchConcurrency = 1)
{
// dispatchConcurrency is a transport hint; the in-process bus already invokes
// each subscription handler asynchronously per publish call, so there's no
// separate dispatch loop to widen.
var subscription = new InProcessSubscription(topic, subscriptionName, handler, _bus, _logger);
_bus.Register(subscription);
return Task.FromResult<ISubscription>(subscription);
Expand Down
24 changes: 22 additions & 2 deletions src/RockBot.Messaging.RabbitMQ/RabbitMqConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,30 @@ public RabbitMqConnectionManager(
/// Each caller gets its own channel (publishers and subscribers should
/// not share channels in RabbitMQ).
/// </summary>
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
/// <param name="consumerDispatchConcurrency">When non-null and greater than 1,
/// configures the channel to dispatch consumer events concurrently. Used by
/// subscriber channels whose handlers must run in parallel (e.g. subagent
/// result accumulation). Null falls back to the connection-level default.</param>
public async Task<IChannel> CreateChannelAsync(
CancellationToken cancellationToken = default,
ushort? consumerDispatchConcurrency = null)
{
var connection = await GetConnectionAsync(cancellationToken);
var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);

IChannel channel;
if (consumerDispatchConcurrency is { } concurrency && concurrency > 1)
{
var options = new CreateChannelOptions(
publisherConfirmationsEnabled: false,
publisherConfirmationTrackingEnabled: false,
outstandingPublisherConfirmationsRateLimiter: null,
consumerDispatchConcurrency: concurrency);
channel = await connection.CreateChannelAsync(options, cancellationToken);
}
else
{
channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
}

// Declare the topic exchange
await channel.ExchangeDeclareAsync(
Expand Down
30 changes: 23 additions & 7 deletions src/RockBot.Messaging.RabbitMQ/RabbitMqPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ public sealed class RabbitMqPublisher : IMessagePublisher
private readonly ILogger<RabbitMqPublisher> _logger;
private IChannel? _channel;
private readonly SemaphoreSlim _channelLock = new(1, 1);

// RabbitMQ.Client v7 IChannel is NOT thread-safe for publishing — concurrent
// BasicPublishAsync calls on the same channel can interleave AMQP frames and
// corrupt the wire protocol. Serialize publishes through this semaphore so the
// singleton publisher remains safe even when consumers (e.g. subagent results
// with raised dispatch concurrency) trigger parallel publish paths.
private readonly SemaphoreSlim _publishLock = new(1, 1);
private bool _disposed;

public RabbitMqPublisher(
Expand Down Expand Up @@ -79,13 +86,21 @@ public async Task PublishAsync(
"Publishing message {MessageId} to topic {Topic} (type: {Type})",
envelope.MessageId, topic, envelope.MessageType);

await channel.BasicPublishAsync(
exchange: _options.ExchangeName,
routingKey: topic,
mandatory: false,
basicProperties: properties,
body: envelope.Body,
cancellationToken: cancellationToken);
await _publishLock.WaitAsync(cancellationToken);
try
{
await channel.BasicPublishAsync(
exchange: _options.ExchangeName,
routingKey: topic,
mandatory: false,
basicProperties: properties,
body: envelope.Body,
cancellationToken: cancellationToken);
}
finally
{
_publishLock.Release();
}

sw.Stop();
RabbitMqDiagnostics.PublishDuration.Record(sw.Elapsed.TotalMilliseconds,
Expand Down Expand Up @@ -132,5 +147,6 @@ public async ValueTask DisposeAsync()
await _channel.CloseAsync();

_channelLock.Dispose();
_publishLock.Dispose();
}
}
14 changes: 10 additions & 4 deletions src/RockBot.Messaging.RabbitMQ/RabbitMqSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,26 @@ public async Task<ISubscription> SubscribeAsync(
string topic,
string subscriptionName,
Func<MessageEnvelope, CancellationToken, Task<MessageResult>> handler,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
int dispatchConcurrency = 1)
{
var queueName = $"rockbot.{subscriptionName}";
var dlqName = $"{queueName}.dlq";
var exchangeName = _options.ExchangeName;
var dlxName = _options.DeadLetterExchangeName;
var prefetchCount = _options.PrefetchCount;
var durable = _options.Durable;
// Translate the abstraction's dispatchConcurrency hint into a channel-level
// ConsumerDispatchConcurrency. Values <=1 leave it unset so we keep the
// connection-level default and the channel processes deliveries sequentially.
var channelConcurrency = dispatchConcurrency > 1 ? (ushort?)dispatchConcurrency : null;

// Factory that creates a fresh channel + consumer, called both for initial
// setup and for transparent reconnection after unexpected channel closure.
async Task<(IChannel channel, string consumerTag)> CreateChannelAndConsumerAsync(
CancellationToken ct)
{
var channel = await _connectionManager.CreateChannelAsync(ct);
var channel = await _connectionManager.CreateChannelAsync(ct, channelConcurrency);

await channel.BasicQosAsync(
prefetchSize: 0,
Expand Down Expand Up @@ -97,8 +102,9 @@ await channel.QueueDeclareAsync(
"Queue {Queue} has stale arguments — deleting and recreating: {Reason}",
queueName, ex.ShutdownReason.ReplyText);

// The original channel is dead after a 406; open a new one.
channel = await _connectionManager.CreateChannelAsync(ct);
// The original channel is dead after a 406; open a new one with the
// same concurrency setting as the original.
channel = await _connectionManager.CreateChannelAsync(ct, channelConcurrency);
await channel.BasicQosAsync(0, prefetchCount, false, cancellationToken: ct);

await channel.QueueDeleteAsync(queueName, cancellationToken: ct);
Expand Down
Loading
Loading