feat(instrument): Extract SSE parser to shared utility, port to agno + ms_agent_framework (cross-poll #9)#113
Closed
mmercuri wants to merge 3 commits into
Closed
feat(instrument): Extract SSE parser to shared utility, port to agno + ms_agent_framework (cross-poll #9)#113mmercuri wants to merge 3 commits into
mmercuri wants to merge 3 commits into
Conversation
Bootstraps the LayerLens instrument layer with the abstract base classes,
adapter registry, capture configuration, event sinks, vendored event
schemas, and pydantic v1/v2 compatibility shim that every concrete
adapter (frameworks, protocols, providers) will depend on.
Scope
-----
- src/layerlens/instrument/__init__.py: lean re-export surface
- src/layerlens/instrument/_vendored/: frozen ateam event schemas (no
runtime ateam dependency)
- src/layerlens/instrument/adapters/_base/: BaseAdapter, AdapterRegistry,
AdapterStatus, AdapterHealth, AdapterCapability, ReplayableTrace,
CaptureConfig, EventSink, TraceStoreSink, IngestionPipelineSink,
PydanticCompat
- src/layerlens/_compat/pydantic.py: model_dump/model_validate shim
spanning pydantic v1 + v2
- scripts/{port_adapter,port_protocol,emit_adapter_manifest,
regen_dep_baselines}.py: codegen helpers used to port the rest of M1
- tests/instrument/{test_base_layer,test_lazy_imports,
test_default_install,test_resolved_dep_tree}.py + _baselines/
- .github/workflows/dep-tree-guard.yaml: CI gate that locks the default
install footprint
- docs/adapters/: CONTRIBUTING, STATUS, pydantic-compatibility, testing,
PERSONA_REVIEW
Blast radius
------------
- Pure additions. No public surface changes outside the new
layerlens.instrument namespace.
- Default `pip install layerlens` install set is unchanged (verified by
test_default_install.py against the new baseline).
- Lazy adapter discovery: importing layerlens.instrument MUST NOT pull
in any optional adapter dep (verified by test_lazy_imports.py).
Test plan
---------
- uv run pytest tests/instrument/test_base_layer.py
tests/instrument/test_lazy_imports.py -x -> 45 passed
- The dep-tree-guard workflow exercises test_default_install.py and
test_resolved_dep_tree.py against the new baselines on every PR.
LAY-3400 umbrella: this PR is the prerequisite for the M1.B/M1.C/M1.D
adapter ports, M7 protocol certification, and M8 Cohere/Mistral.
Ports the twelve agent-tier framework adapters from the ateam
reference implementation onto the new layerlens.instrument base layer:
Semantic Kernel, LlamaIndex, OpenAI Agents, Pydantic-AI, Agno,
Strands, SmolAgents, MS Agent Framework, Google ADK,
Bedrock Agents, Embedding (vector store hooks), Benchmark Import
Pairs with feat/instrument-frameworks-orchestration (M1.C part 1)
which lands LangChain, LangGraph, CrewAI, AutoGen, Langfuse, and
Agentforce. Together they complete M1.C.
Scope
-----
- src/layerlens/instrument/adapters/frameworks/{semantic_kernel,
llama_index,openai_agents,pydantic_ai,agno,strands,smolagents,
ms_agent_framework,google_adk,bedrock_agents,embedding,
benchmark_import}/: per-framework packages
- tests/instrument/adapters/frameworks/test_*_adapter.py + the
test_bulk_ported_smoke.py harness (which exercises every ported
adapter against canned trace fixtures so partial framework SDKs
on a given runner don't drop coverage to zero)
- samples/instrument/<framework>/: runnable per-framework samples
- docs/adapters/frameworks-<framework>.md: per-framework integration
guide
- pyproject.toml: twelve new optional extras
(semantic-kernel, llama-index, openai-agents, pydantic-ai, agno,
strands, smolagents, ms-agent-framework, google-adk,
bedrock-agents, embedding, benchmark-import) with python_version
markers; pyright/ruff exclusions for the dynamic monkey-patching
framework code
Blast radius
------------
- Default `pip install layerlens` install set is unchanged. Each
framework's heavy deps are gated behind their own extra.
- No changes to existing public API surface.
- Importing layerlens.instrument still does NOT pull in any framework
module (lazy registry lookup).
Test plan
---------
- uv run pytest tests/instrument/adapters/frameworks/ -x ->
184 passed, 1 skipped (test_bulk_ported_smoke.py covers all 12
agent-tier adapters plus the orchestration-tier ones from part 1
via the same harness)
Stacks on
---------
- feat/instrument-base-foundation (M1.A) — required for the
BaseAdapter surface this PR consumes.
Sibling of
----------
- feat/instrument-frameworks-orchestration (M1.C part 1) — both
branches stack on the base foundation independently and don't
conflict; they can land in either order.
LAY-3400 umbrella (M1.C part 2).
Cross-pollination item #9 (audit at A:/tmp/adapter-cross-pollination-audit.md §2.10 / §3.1) — extract Server-Sent Events handling into a single shared utility and wire it into agno + ms_agent_framework streaming code paths so each chunk emits a discrete model.stream.chunk event instead of being collapsed into one accumulated emission. Shared parser ------------- * New module: src/layerlens/instrument/adapters/_base/sse_parser.py - SSEEvent: pydantic model for one dispatched event (data/event/id/retry + done convenience flag for the OpenAI [DONE] sentinel). - SSEParser: incremental, stateful parser. Handles partial events at chunk boundaries via a line-remainder buffer, and UTF-8 multi-byte codepoint splits via codecs.getincrementaldecoder. Mixed CR / LF / CRLF terminators all parse correctly per W3C HTML SSE spec. - parse_event(text): convenience for a fully-framed single block. - parse_stream(byte_iter): async helper for httpx.aiter_bytes-style sources. Drains a final flush() so an unterminated trailing event surfaces correctly. - 1 MiB MAX_LINE_BYTES guard against runaway buffers. agno port --------- * AgnoAdapter._create_traced_run / _create_traced_run_sync detect ``stream=True`` calls; if the result is iterable, the adapter wraps it in a generator that emits one ``model.stream.chunk`` per yielded object — or, when chunks are bytes/str, runs them through SSEParser so multi-event chunks expand into per-event emissions and partial events across chunks are reassembled. * Final on_run_end fires from the wrapper's finally block on stream exhaustion so duration measurement remains accurate for the streaming path. ms_agent_framework port ----------------------- * MSAgentAdapter._create_traced_invoke_stream now emits a discrete ``model.stream.chunk`` event per StreamingChatMessageContent (or per parsed SSE event when chunks arrive as raw bytes/str), rather than silently accumulating last_message and emitting only run start/end. Capture-config plumbing ----------------------- * CaptureConfig.is_layer_enabled gains a ``model.stream.chunk`` → ``l3_model_metadata`` mapping so the new event type is gated by the same layer flag as ``model.invoke`` (additive only — no existing event type or default changes). Tests ----- * tests/instrument/adapters/_base/test_sse_parser.py — 56 tests across W3C field handling, mixed line terminators, partial chunks, UTF-8 multi-byte boundaries, [DONE] sentinel, async parse_stream, malformed input tolerance, buffer overflow protection, and OpenAI / Agentforce realistic stream end-to-end. * tests/instrument/adapters/frameworks/test_agno_adapter.py — 5 new streaming tests cover object chunks, multi-event SSE bytes, partial SSE across chunks, passthrough fidelity, and non-stream path invariance. * tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py — 4 new streaming tests mirror the agno coverage for invoke_stream. LangChain refactor: not applicable on this branch — LangChain adapter does not currently implement its own SSE parser (it consumes upstream LangChain callbacks, not raw HTTP). Audit item §2.10 lists langchain as "none" for SSE parsing; only agentforce had bespoke SSE wire-format handling, which now has a shared replacement available for future adoption. Acceptance gates (all green): * pytest tests/instrument/adapters/_base/test_sse_parser.py -x → 56 / 56 * pytest tests/instrument/adapters/frameworks/test_{agno,ms_agent_framework}_adapter.py -x → 27 / 27 (no regression) * mypy --strict on every modified .py file → clean * ruff check on every modified .py file → clean
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Cross-pollination item #9 from
A:/tmp/adapter-cross-pollination-audit.md§2.10 / §3.1: extract Server-Sent Events handling from per-adapter ad-hoc code into a single shared utility, then wire it into the agno and ms_agent_framework streaming code paths so each chunk emits a discretemodel.stream.chunkevent instead of being collapsed into one accumulated emission.What changed
Shared parser (
src/layerlens/instrument/adapters/_base/sse_parser.py)SSEEvent(pydantic model): one dispatched event withdata,event,id,retry, plus adoneconvenience flag for the OpenAI / Agentforce[DONE]sentinel.SSEParser: incremental, stateful parser. Spec-compliant per the WHATWG HTML "Server-sent events" section.codecs.getincrementaldecoderso a 3-byte★or 4-byte🌍reassembled correctly.\n/\r/\r\nline terminators all parse correctly.data:joined with\nper spec; one leading space stripped after:per spec.:lines), unknown fields, NUL inid:, non-digitretry:all tolerated.MAX_LINE_BYTESguard against runaway buffers from malformed upstream.parse_event(text): convenience for a fully-framed single block.parse_stream(byte_iter): async helper forhttpx.aiter_bytes-style sources. Drains a finalflush()so an unterminated trailing event surfaces correctly.agno port
AgnoAdapter._create_traced_run/_create_traced_run_syncnow detectstream=Truecalls. When the result is iterable, the adapter wraps it in a generator that emits onemodel.stream.chunkper yielded object — or, when chunks are bytes/str, runs them throughSSEParserso multi-event chunks expand into per-event emissions and partial events across chunks are reassembled.on_run_endfires from the wrapper'sfinallyblock on stream exhaustion so duration measurement remains accurate for the streaming path.ms_agent_framework port
MSAgentAdapter._create_traced_invoke_streamnow emits a discretemodel.stream.chunkevent perStreamingChatMessageContent(or per parsed SSE event when chunks arrive as raw bytes/str), rather than silently accumulatinglast_messageand emitting only run start/end.Capture-config plumbing
CaptureConfig.is_layer_enabledgains amodel.stream.chunk→l3_model_metadatamapping so the new event type is gated by the same layer flag asmodel.invoke. Strictly additive — no existing event-type or default behavior changes.LangChain refactor
Not applicable on this branch — the LangChain adapter does not currently implement its own SSE parser (it consumes upstream LangChain callbacks rather than raw HTTP). Audit item §2.10 lists
langchainas "none" for SSE parsing; onlyagentforcehad bespoke SSE wire-format handling, which now has a shared replacement available for future adoption.Test plan
uv run pytest tests/instrument/adapters/_base/test_sse_parser.py -x→ 56 / 56 passeddata:concatenation per spec\n/\r/\r\nterminators[DONE]sentinel surfaced viadoneflagparse_stream()over an async byte iteratorparse_event()uv run pytest tests/instrument/adapters/frameworks/test_agno_adapter.py -x→ 17 / 17 passed (12 pre-existing + 5 new streaming tests)uv run pytest tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py -x→ 16 / 16 passed (12 pre-existing + 4 new streaming tests)uv run pytest tests/instrument/ --ignore=...known-pre-existing→ 232 passed, 1 skipped (no regressions)uv run pytest tests/ --ignore=...known-pre-existing→ 834 passed, 1 skipped (no regressions in resources / models / clients)uv run mypy --stricton every modified.pyfile → cleanuv run ruff checkon every modified.pyfile → cleanFiles modified
src/layerlens/instrument/adapters/_base/sse_parser.py(new, 372 lines)src/layerlens/instrument/adapters/_base/__init__.py(export new symbols)src/layerlens/instrument/adapters/_base/capture.py(one-line additive map entry)src/layerlens/instrument/adapters/frameworks/agno/lifecycle.py(streaming wrapper)src/layerlens/instrument/adapters/frameworks/ms_agent_framework/lifecycle.py(streaming wrapper)tests/instrument/adapters/_base/__init__.py(new package)tests/instrument/adapters/_base/test_sse_parser.py(new, 56 tests)tests/instrument/adapters/frameworks/test_agno_adapter.py(+5 streaming tests)tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py(+4 streaming tests)