Skip to content

feat(instrument): Extract SSE parser to shared utility, port to agno + ms_agent_framework (cross-poll #9)#113

Closed
mmercuri wants to merge 3 commits into
mainfrom
feat/instrument-sse-parser-shared
Closed

feat(instrument): Extract SSE parser to shared utility, port to agno + ms_agent_framework (cross-poll #9)#113
mmercuri wants to merge 3 commits into
mainfrom
feat/instrument-sse-parser-shared

Conversation

@mmercuri

Copy link
Copy Markdown
Contributor

Summary

Cross-pollination item #9 from A:/tmp/adapter-cross-pollination-audit.md §2.10 / §3.1: extract Server-Sent Events handling from per-adapter ad-hoc code into a single shared utility, then wire it into the agno and ms_agent_framework streaming code paths so each chunk emits a discrete model.stream.chunk event instead of being collapsed into one accumulated emission.

What changed

Shared parser (src/layerlens/instrument/adapters/_base/sse_parser.py)

  • SSEEvent (pydantic model): one dispatched event with data, event, id, retry, plus a done convenience flag for the OpenAI / Agentforce [DONE] sentinel.
  • SSEParser: incremental, stateful parser. Spec-compliant per the WHATWG HTML "Server-sent events" section.
    • Partial events at chunk boundaries: held in a line-remainder buffer until a terminator arrives.
    • UTF-8 multi-byte codepoint splits across chunks: handled via codecs.getincrementaldecoder so a 3-byte or 4-byte 🌍 reassembled correctly.
    • Mixed \n / \r / \r\n line terminators all parse correctly.
    • Multi-line data: joined with \n per spec; one leading space stripped after : per spec.
    • Comments (: lines), unknown fields, NUL in id:, non-digit retry: all tolerated.
    • 1 MiB MAX_LINE_BYTES guard against runaway buffers from malformed upstream.
  • parse_event(text): convenience for a fully-framed single block.
  • parse_stream(byte_iter): async helper for httpx.aiter_bytes-style sources. Drains a final flush() so an unterminated trailing event surfaces correctly.

agno port

  • AgnoAdapter._create_traced_run / _create_traced_run_sync now detect stream=True calls. When the result is iterable, the adapter wraps it in a generator that emits one model.stream.chunk per yielded object — or, when chunks are bytes/str, runs them through SSEParser so multi-event chunks expand into per-event emissions and partial events across chunks are reassembled.
  • Final on_run_end fires from the wrapper's finally block on stream exhaustion so duration measurement remains accurate for the streaming path.

ms_agent_framework port

  • MSAgentAdapter._create_traced_invoke_stream now emits a discrete model.stream.chunk event per StreamingChatMessageContent (or per parsed SSE event when chunks arrive as raw bytes/str), rather than silently accumulating last_message and emitting only run start/end.

Capture-config plumbing

  • CaptureConfig.is_layer_enabled gains a model.stream.chunkl3_model_metadata mapping so the new event type is gated by the same layer flag as model.invoke. Strictly additive — no existing event-type or default behavior changes.

LangChain refactor

Not applicable on this branch — the LangChain adapter does not currently implement its own SSE parser (it consumes upstream LangChain callbacks rather than raw HTTP). Audit item §2.10 lists langchain as "none" for SSE parsing; only agentforce had bespoke SSE wire-format handling, which now has a shared replacement available for future adoption.

Test plan

  • uv run pytest tests/instrument/adapters/_base/test_sse_parser.py -x56 / 56 passed
    • W3C field handling (data / event / id / retry, comments, unknown fields, leading-space stripping, NUL in id, non-digit retry)
    • Multi-line data: concatenation per spec
    • Mixed \n / \r / \r\n terminators
    • Partial event / chunk boundary handling (mid-line, mid-field-name, mid-block, byte-by-byte)
    • UTF-8 multi-byte codepoint splits (3-byte and 4-byte)
    • [DONE] sentinel surfaced via done flag
    • Async parse_stream() over an async byte iterator
    • Single-block convenience parse_event()
    • Malformed input tolerance, buffer overflow protection
    • Realistic OpenAI chat.completions and Agentforce text streams end-to-end
    • Parametrized stress: byte-by-byte through 256-byte chunks, all yield identical events
  • uv run pytest tests/instrument/adapters/frameworks/test_agno_adapter.py -x17 / 17 passed (12 pre-existing + 5 new streaming tests)
  • uv run pytest tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py -x16 / 16 passed (12 pre-existing + 4 new streaming tests)
  • uv run pytest tests/instrument/ --ignore=...known-pre-existing232 passed, 1 skipped (no regressions)
  • uv run pytest tests/ --ignore=...known-pre-existing834 passed, 1 skipped (no regressions in resources / models / clients)
  • uv run mypy --strict on every modified .py file → clean
  • uv run ruff check on every modified .py file → clean

Files modified

  • src/layerlens/instrument/adapters/_base/sse_parser.py (new, 372 lines)
  • src/layerlens/instrument/adapters/_base/__init__.py (export new symbols)
  • src/layerlens/instrument/adapters/_base/capture.py (one-line additive map entry)
  • src/layerlens/instrument/adapters/frameworks/agno/lifecycle.py (streaming wrapper)
  • src/layerlens/instrument/adapters/frameworks/ms_agent_framework/lifecycle.py (streaming wrapper)
  • tests/instrument/adapters/_base/__init__.py (new package)
  • tests/instrument/adapters/_base/test_sse_parser.py (new, 56 tests)
  • tests/instrument/adapters/frameworks/test_agno_adapter.py (+5 streaming tests)
  • tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py (+4 streaming tests)

mmercuri and others added 3 commits April 25, 2026 19:13
Bootstraps the LayerLens instrument layer with the abstract base classes,
adapter registry, capture configuration, event sinks, vendored event
schemas, and pydantic v1/v2 compatibility shim that every concrete
adapter (frameworks, protocols, providers) will depend on.

Scope
-----
- src/layerlens/instrument/__init__.py: lean re-export surface
- src/layerlens/instrument/_vendored/: frozen ateam event schemas (no
  runtime ateam dependency)
- src/layerlens/instrument/adapters/_base/: BaseAdapter, AdapterRegistry,
  AdapterStatus, AdapterHealth, AdapterCapability, ReplayableTrace,
  CaptureConfig, EventSink, TraceStoreSink, IngestionPipelineSink,
  PydanticCompat
- src/layerlens/_compat/pydantic.py: model_dump/model_validate shim
  spanning pydantic v1 + v2
- scripts/{port_adapter,port_protocol,emit_adapter_manifest,
  regen_dep_baselines}.py: codegen helpers used to port the rest of M1
- tests/instrument/{test_base_layer,test_lazy_imports,
  test_default_install,test_resolved_dep_tree}.py + _baselines/
- .github/workflows/dep-tree-guard.yaml: CI gate that locks the default
  install footprint
- docs/adapters/: CONTRIBUTING, STATUS, pydantic-compatibility, testing,
  PERSONA_REVIEW

Blast radius
------------
- Pure additions. No public surface changes outside the new
  layerlens.instrument namespace.
- Default `pip install layerlens` install set is unchanged (verified by
  test_default_install.py against the new baseline).
- Lazy adapter discovery: importing layerlens.instrument MUST NOT pull
  in any optional adapter dep (verified by test_lazy_imports.py).

Test plan
---------
- uv run pytest tests/instrument/test_base_layer.py
  tests/instrument/test_lazy_imports.py -x  -> 45 passed
- The dep-tree-guard workflow exercises test_default_install.py and
  test_resolved_dep_tree.py against the new baselines on every PR.

LAY-3400 umbrella: this PR is the prerequisite for the M1.B/M1.C/M1.D
adapter ports, M7 protocol certification, and M8 Cohere/Mistral.
Ports the twelve agent-tier framework adapters from the ateam
reference implementation onto the new layerlens.instrument base layer:

  Semantic Kernel, LlamaIndex, OpenAI Agents, Pydantic-AI, Agno,
  Strands, SmolAgents, MS Agent Framework, Google ADK,
  Bedrock Agents, Embedding (vector store hooks), Benchmark Import

Pairs with feat/instrument-frameworks-orchestration (M1.C part 1)
which lands LangChain, LangGraph, CrewAI, AutoGen, Langfuse, and
Agentforce. Together they complete M1.C.

Scope
-----
- src/layerlens/instrument/adapters/frameworks/{semantic_kernel,
  llama_index,openai_agents,pydantic_ai,agno,strands,smolagents,
  ms_agent_framework,google_adk,bedrock_agents,embedding,
  benchmark_import}/: per-framework packages
- tests/instrument/adapters/frameworks/test_*_adapter.py + the
  test_bulk_ported_smoke.py harness (which exercises every ported
  adapter against canned trace fixtures so partial framework SDKs
  on a given runner don't drop coverage to zero)
- samples/instrument/<framework>/: runnable per-framework samples
- docs/adapters/frameworks-<framework>.md: per-framework integration
  guide
- pyproject.toml: twelve new optional extras
  (semantic-kernel, llama-index, openai-agents, pydantic-ai, agno,
  strands, smolagents, ms-agent-framework, google-adk,
  bedrock-agents, embedding, benchmark-import) with python_version
  markers; pyright/ruff exclusions for the dynamic monkey-patching
  framework code

Blast radius
------------
- Default `pip install layerlens` install set is unchanged. Each
  framework's heavy deps are gated behind their own extra.
- No changes to existing public API surface.
- Importing layerlens.instrument still does NOT pull in any framework
  module (lazy registry lookup).

Test plan
---------
- uv run pytest tests/instrument/adapters/frameworks/ -x  ->
  184 passed, 1 skipped (test_bulk_ported_smoke.py covers all 12
  agent-tier adapters plus the orchestration-tier ones from part 1
  via the same harness)

Stacks on
---------
- feat/instrument-base-foundation (M1.A) — required for the
  BaseAdapter surface this PR consumes.

Sibling of
----------
- feat/instrument-frameworks-orchestration (M1.C part 1) — both
  branches stack on the base foundation independently and don't
  conflict; they can land in either order.

LAY-3400 umbrella (M1.C part 2).
Cross-pollination item #9 (audit at A:/tmp/adapter-cross-pollination-audit.md
§2.10 / §3.1) — extract Server-Sent Events handling into a single shared
utility and wire it into agno + ms_agent_framework streaming code paths so
each chunk emits a discrete model.stream.chunk event instead of being
collapsed into one accumulated emission.

Shared parser
-------------
* New module: src/layerlens/instrument/adapters/_base/sse_parser.py
  - SSEEvent: pydantic model for one dispatched event (data/event/id/retry
    + done convenience flag for the OpenAI [DONE] sentinel).
  - SSEParser: incremental, stateful parser. Handles partial events at
    chunk boundaries via a line-remainder buffer, and UTF-8 multi-byte
    codepoint splits via codecs.getincrementaldecoder. Mixed CR / LF /
    CRLF terminators all parse correctly per W3C HTML SSE spec.
  - parse_event(text): convenience for a fully-framed single block.
  - parse_stream(byte_iter): async helper for httpx.aiter_bytes-style
    sources. Drains a final flush() so an unterminated trailing event
    surfaces correctly.
  - 1 MiB MAX_LINE_BYTES guard against runaway buffers.

agno port
---------
* AgnoAdapter._create_traced_run / _create_traced_run_sync detect
  ``stream=True`` calls; if the result is iterable, the adapter wraps it
  in a generator that emits one ``model.stream.chunk`` per yielded
  object — or, when chunks are bytes/str, runs them through SSEParser
  so multi-event chunks expand into per-event emissions and partial
  events across chunks are reassembled.
* Final on_run_end fires from the wrapper's finally block on stream
  exhaustion so duration measurement remains accurate for the streaming
  path.

ms_agent_framework port
-----------------------
* MSAgentAdapter._create_traced_invoke_stream now emits a discrete
  ``model.stream.chunk`` event per StreamingChatMessageContent (or per
  parsed SSE event when chunks arrive as raw bytes/str), rather than
  silently accumulating last_message and emitting only run start/end.

Capture-config plumbing
-----------------------
* CaptureConfig.is_layer_enabled gains a ``model.stream.chunk`` →
  ``l3_model_metadata`` mapping so the new event type is gated by the
  same layer flag as ``model.invoke`` (additive only — no existing
  event type or default changes).

Tests
-----
* tests/instrument/adapters/_base/test_sse_parser.py — 56 tests across
  W3C field handling, mixed line terminators, partial chunks, UTF-8
  multi-byte boundaries, [DONE] sentinel, async parse_stream, malformed
  input tolerance, buffer overflow protection, and OpenAI / Agentforce
  realistic stream end-to-end.
* tests/instrument/adapters/frameworks/test_agno_adapter.py — 5 new
  streaming tests cover object chunks, multi-event SSE bytes, partial
  SSE across chunks, passthrough fidelity, and non-stream path
  invariance.
* tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py
  — 4 new streaming tests mirror the agno coverage for invoke_stream.

LangChain refactor: not applicable on this branch — LangChain adapter
does not currently implement its own SSE parser (it consumes upstream
LangChain callbacks, not raw HTTP). Audit item §2.10 lists langchain as
"none" for SSE parsing; only agentforce had bespoke SSE wire-format
handling, which now has a shared replacement available for future
adoption.

Acceptance gates (all green):
* pytest tests/instrument/adapters/_base/test_sse_parser.py -x → 56 / 56
* pytest tests/instrument/adapters/frameworks/test_{agno,ms_agent_framework}_adapter.py -x → 27 / 27 (no regression)
* mypy --strict on every modified .py file → clean
* ruff check on every modified .py file → clean
@mmercuri mmercuri requested a review from m-peko April 26, 2026 23:34
@m-peko m-peko closed this May 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants