Skip to content

.NET: [Feature]: Unable to restrict AG-UI streaming to a single agent #5819

@kpobb1989

Description

@kpobb1989

Description

Input:
Frontend: Angular 21 + CopilotKit
Backend: .NET 10 + Microsoft Agent Framework + AG-UI

When only a single agent is registered on the backend, AG-UI behaves as expected and streams responses from that agent via SSE.
However, once multiple agents are registered, AG-UI package starts streaming responses from all agents, which is unexpected and undesired behavior.
For example, with two agents - Planner and Executor - the expected behavior is to stream responses only from the final agent (e.g., Executor), not both. I haven’t found a proper way to achieve this using the Microsoft Agent Framework and I had to create a workaround.

Code Sample

The following setup streams data from all registered agents
 
var projectClient = new AIProjectClient(new Uri(_chatConfiguration.AIFoundry.ProjectEndpoint), new DefaultAzureCredential());
 
var plannerAgent = projectClient.AsAIAgent(new ChatClientAgentOptions
{
  Name = "PlannerAgent",
  ChatOptions =  new()
  {
    Instructions = _chatConfiguration.Agents.Planner.Instructions,                                                                          
    ModelId = _chatConfiguration.AIFoundry.ModelDeploymentName,
  }
});
 
var executorAgent = projectClient.AsAIAgent(new ChatClientAgentOptions
{
  Name = "ExecutorAgent",
  ChatOptions = new()
  {
    Instructions = _chatConfiguration.Agents.Executor.Instructions,                                                                          
    ModelId = _chatConfiguration.AIFoundry.ModelDeploymentName,
  }
});
 
// Standard sequential workflow which runs agents and stream responses from all agents as they come in.
var workflow = AgentWorkflowBuilder.BuildSequential("ChatWorkflow", [plannerAgent, executorAgent]);
_agent = workflow.AsAIAgent(
id: "chat-workflow",
name: "ChatWorkflow",
description: "Multi-agent pipeline",
executionEnvironment: InProcessExecution.Lockstep);

My workaround (instead of using AgentWorkflowBuilder)

/// <summary>
/// Coordinates the execution of multiple AI agents by orchestrating a planning workflow followed by a streaming agent
/// for response generation.
/// </summary>
/// <remarks>The AgentOrchestrator first executes a non-streaming planning workflow using all agents except the
/// last, then delegates response generation to the final agent in the sequence. The orchestrator supports both
/// streaming and non-streaming execution modes. The session state is managed exclusively by the streaming
/// agent.</remarks>
/// <param name="agents">The collection of AI agents to be orchestrated. Must contain at least one agent, where the last agent is used for
/// streaming responses and the preceding agents define the planning workflow.</param>
public sealed class AgentOrchestrator(IEnumerable<AIAgent> agents) : AIAgent
{
                private readonly AIAgent _streamingAgent = agents.LastOrDefault() ?? throw new ArgumentException("At least one agent must be provided", nameof(agents));
                private readonly AIAgent _nonStreamingWorkflow = AgentWorkflowBuilder.BuildSequential(agents.SkipLast(1)).AsAIAgent();
 
                // Session is owned ONLY by the streaming agent
                protected override ValueTask<AgentSession> CreateSessionCoreAsync(
                                CancellationToken cancellationToken = default)
                                => _streamingAgent.CreateSessionAsync(cancellationToken);
 
                protected override ValueTask<JsonElement> SerializeSessionCoreAsync(
                                AgentSession session,
                                JsonSerializerOptions? jsonSerializerOptions = null,
                                CancellationToken cancellationToken = default)
                                => _streamingAgent.SerializeSessionAsync(session, jsonSerializerOptions, cancellationToken);
 
                protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(
                                JsonElement serializedState,
                                JsonSerializerOptions? jsonSerializerOptions = null,
                                CancellationToken cancellationToken = default)
                                => _streamingAgent.DeserializeSessionAsync(serializedState, jsonSerializerOptions, cancellationToken);
 
                // Non-streaming execution
                protected override async Task<AgentResponse> RunCoreAsync(
                                IEnumerable<ChatMessage> messages,
                                AgentSession? session = null,
                                AgentRunOptions? options = null,
                                CancellationToken cancellationToken = default)
                {
                                options ??= new AgentRunOptions();
                                options.AdditionalProperties ??= [];
 
                                // 1) Run planner workflow (NON-streaming)
                                var workflowResponse =
                                                await _nonStreamingWorkflow
                                                                .RunAsync(messages, session: null, options, cancellationToken)
                                                                .ConfigureAwait(false);
 
                                // 2) Store execution plan out-of-band
                                options.AdditionalProperties[nameof(AgentOrchestrator)] = workflowResponse.Text;
 
                                // 3) Run executor (non-streaming)
                                return await _streamingAgent
                                                .RunAsync(messages, session, options, cancellationToken)
                                                .ConfigureAwait(false);
                }
 
                // Streaming execution
                protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
                                IEnumerable<ChatMessage> messages,
                                AgentSession? session = null,
                                AgentRunOptions? options = null,
                                [System.Runtime.CompilerServices.EnumeratorCancellation]
                                CancellationToken cancellationToken = default)
                {
                                options ??= new AgentRunOptions();
                                options.AdditionalProperties ??= [];
 
                                // 1) Run planner workflow FIRST (NON-streaming)
                                var workflowResponse =
                                                await _nonStreamingWorkflow
                                                                .RunAsync(messages, session: null, options, cancellationToken)
                                                                .ConfigureAwait(false);
 
                                // 2) Store execution plan out-of-band
                                options.AdditionalProperties[nameof(AgentOrchestrator)] = workflowResponse.Text;
 
                                // 3) Stream ONLY from executor
                                await foreach (var update in _streamingAgent
                                                .RunStreamingAsync(messages, session, options, cancellationToken)
                                                .ConfigureAwait(false))
                                {
                                                yield return update;
                                }
                }
}

Language/SDK

.NET

Metadata

Metadata

Assignees

No one assigned

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions