diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py b/python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py index d80ecea7a1..f4116b221e 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py @@ -9,7 +9,7 @@ from collections.abc import AsyncGenerator, Sequence from typing import Any -from ag_ui.core import RunErrorEvent +from ag_ui.core import BaseEvent, RunErrorEvent from ag_ui.encoder import EventEncoder from agent_framework import SupportsAgentRun, Workflow from fastapi import FastAPI, HTTPException @@ -93,6 +93,19 @@ async def event_generator() -> AsyncGenerator[str]: event_count = 0 try: async for event in protocol_runner.run(input_data): + # Guard: only BaseEvent instances can be SSE-encoded. + # Non-BaseEvent objects (e.g. AgentResponseUpdate) lack + # model_dump_json() and would cause an AttributeError + # in EventEncoder.encode(). Skip them with a warning. + if not isinstance(event, BaseEvent): + logger.warning( + "[%s] Skipping non-BaseEvent object of type %s; " + "only ag_ui.core.BaseEvent instances can be SSE-encoded.", + path, + type(event).__name__, + ) + continue + event_count += 1 event_type_name = getattr(event, "type", type(event).__name__) # Log important events at INFO level diff --git a/python/packages/ag-ui/tests/ag_ui/test_endpoint.py b/python/packages/ag-ui/tests/ag_ui/test_endpoint.py index 51ab468b84..80e7f1abaf 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_endpoint.py +++ b/python/packages/ag-ui/tests/ag_ui/test_endpoint.py @@ -9,6 +9,7 @@ from ag_ui.core import RunStartedEvent from agent_framework import ( Agent, + AgentResponseUpdate, ChatResponseUpdate, Content, WorkflowBuilder, @@ -603,3 +604,36 @@ async def run(self, input_data: dict[str, Any]): # Should still get 200 (SSE stream), just with no events assert response.status_code == 200 + + +async def test_endpoint_skips_non_base_event_objects(): + """Non-BaseEvent objects (e.g. AgentResponseUpdate) are skipped with a warning. + + Regression test for https://github.com/microsoft/agent-framework/issues/4929 + """ + + class MixedEventWorkflow(AgentFrameworkWorkflow): + async def run(self, input_data: dict[str, Any]): + del input_data + yield RunStartedEvent(run_id="run-1", thread_id="thread-1") + # Yield a non-BaseEvent object — this should be skipped with a warning, not crash + yield AgentResponseUpdate( # type: ignore[misc] + contents=[Content.from_text(text="leaked update")], + role="assistant", + ) + + app = FastAPI() + add_agent_framework_fastapi_endpoint(app, MixedEventWorkflow(), path="/mixed-events") + client = TestClient(app) + + response = client.post("/mixed-events", json={"messages": [{"role": "user", "content": "Hello"}]}) + + assert response.status_code == 200 + content = response.content.decode("utf-8") + lines = [line for line in content.split("\n") if line.startswith("data: ")] + event_types = [json.loads(line[6:]).get("type") for line in lines] + + # RUN_STARTED should be present; the AgentResponseUpdate should have been + # skipped with a warning — no RUN_ERROR or crash. + assert "RUN_STARTED" in event_types + assert "RUN_ERROR" not in event_types