Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections.abc import AsyncGenerator, Sequence
from typing import Any

from ag_ui.core import RunErrorEvent
from ag_ui.core import BaseEvent, RunErrorEvent
from ag_ui.encoder import EventEncoder
from agent_framework import SupportsAgentRun, Workflow
from fastapi import FastAPI, HTTPException
Expand Down Expand Up @@ -93,6 +93,19 @@ async def event_generator() -> AsyncGenerator[str]:
event_count = 0
try:
async for event in protocol_runner.run(input_data):
# Guard: only BaseEvent instances can be SSE-encoded.
# Non-BaseEvent objects (e.g. AgentResponseUpdate) lack
# model_dump_json() and would cause an AttributeError
# in EventEncoder.encode(). Skip them with a warning.
if not isinstance(event, BaseEvent):
logger.warning(
"[%s] Skipping non-BaseEvent object of type %s; "
"only ag_ui.core.BaseEvent instances can be SSE-encoded.",
path,
type(event).__name__,
)
continue

event_count += 1
event_type_name = getattr(event, "type", type(event).__name__)
# Log important events at INFO level
Expand Down
34 changes: 34 additions & 0 deletions python/packages/ag-ui/tests/ag_ui/test_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ag_ui.core import RunStartedEvent
from agent_framework import (
Agent,
AgentResponseUpdate,
ChatResponseUpdate,
Content,
WorkflowBuilder,
Expand Down Expand Up @@ -603,3 +604,36 @@ async def run(self, input_data: dict[str, Any]):

# Should still get 200 (SSE stream), just with no events
assert response.status_code == 200


async def test_endpoint_skips_non_base_event_objects():
"""Non-BaseEvent objects (e.g. AgentResponseUpdate) are skipped with a warning.

Regression test for https://github.com/microsoft/agent-framework/issues/4929
"""

class MixedEventWorkflow(AgentFrameworkWorkflow):
async def run(self, input_data: dict[str, Any]):
del input_data
yield RunStartedEvent(run_id="run-1", thread_id="thread-1")
# Yield a non-BaseEvent object — this should be skipped with a warning, not crash
yield AgentResponseUpdate( # type: ignore[misc]
contents=[Content.from_text(text="leaked update")],
role="assistant",
)

app = FastAPI()
add_agent_framework_fastapi_endpoint(app, MixedEventWorkflow(), path="/mixed-events")
client = TestClient(app)

response = client.post("/mixed-events", json={"messages": [{"role": "user", "content": "Hello"}]})

assert response.status_code == 200
content = response.content.decode("utf-8")
lines = [line for line in content.split("\n") if line.startswith("data: ")]
event_types = [json.loads(line[6:]).get("type") for line in lines]

# RUN_STARTED should be present; the AgentResponseUpdate should have been
# skipped with a warning — no RUN_ERROR or crash.
assert "RUN_STARTED" in event_types
assert "RUN_ERROR" not in event_types