From c61c53f33ae70f3a19d778e8ecac180dec29ac1c Mon Sep 17 00:00:00 2001 From: CocoRoF Date: Tue, 19 May 2026 10:26:29 +0900 Subject: [PATCH] fix(llm_client/claude_code): emit populated message_complete envelope (2.0.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streaming Stage 6 calls with provider=claude_code_cli failed with ``APIError("Stream ended without message_complete")`` for every session. Symptom in a Geny pipeline run: → s06_api ✗ s06_api: Stream ended without message_complete Pipeline error: Stream ended without message_complete Root cause: ``ClaudeCodeCLIClient.create_message_stream`` passed the translator's bare ``{"type": "message_complete"}`` straight through. The s06_api default stage's ``_call_streaming`` reads ``chunk["response"]`` to build the assistant message; with no ``response`` field on the envelope, ``response`` stayed ``None`` and the post-loop guard raised. Anthropic / OpenAI / Google clients all emit ``{"type": "message_complete", "response": APIResponse}`` — the CLI client was the lone outlier. Fix: accumulate text / thinking / tool_use blocks + the final ``result`` envelope's usage as canonical events flow, then yield one terminal ``message_complete`` carrying an assembled APIResponse. Mirrors the contract of every SDK client and reuses the same parsing logic ``assemble_response_from_stream_json`` uses for the non-streaming path. Per-line ``text_delta`` / ``content_block_stop`` / ``result`` events still pass through unchanged. Suppresses the translator's bare ``message_complete`` so callers don't see a half-populated event before the real one — there's only ever one terminal envelope. ### Test ``test_create_message_stream_message_complete_carries_response`` asserts a single terminal envelope with the assembled response (text / stop_reason / usage / model). The existing ``test_create_message_stream_yields_text_deltas`` continues to pass (per-line deltas + final result + complete all still surface). --- CHANGELOG.md | 30 +++++ pyproject.toml | 2 +- src/geny_executor/__init__.py | 2 +- src/geny_executor/llm_client/claude_code.py | 133 +++++++++++++++++++- tests/llm_client/unit/test_claude_code.py | 28 +++++ 5 files changed, 188 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a949a6..3805243 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,36 @@ All notable changes to `geny-executor` are recorded here. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/). +## [2.0.2] — 2026-05-19 + +Patch release. Fixes streaming Stage 6 calls failing with +``Stream ended without message_complete`` when ``claude_code_cli`` +was the selected provider. + +### Fixed + +- ``ClaudeCodeCLIClient.create_message_stream`` now emits a populated + ``{"type": "message_complete", "response": APIResponse}`` envelope + after the CLI exits. The previous implementation passed the + translator's bare ``{"type": "message_complete"}`` straight through, + with no ``response`` field — and the s06_api default stage's + ``_call_streaming`` reads exactly that field to build the assistant + message, so the streaming path raised + ``APIError("Stream ended without message_complete")`` for every + Claude Code (CLI) session. The streaming client now accumulates + text / thinking / tool_use blocks + the final ``result`` envelope's + usage as events flow, then yields one terminal envelope mirroring + the contract every SDK client (anthropic / openai / google) already + honours. Per-line ``text_delta`` / ``content_block_stop`` / ``result`` + events still flow as before so downstream consumers that watch the + partial stream behave unchanged. + +### Migration + +None. Behaviour change is strictly additive — code that ignored +``message_complete`` (or never selected ``claude_code_cli`` for s06) +sees no observable difference. + ## [2.0.1] — 2026-05-18 Patch release. Fixes a crash when a manifest names ``"subagent_type"`` diff --git a/pyproject.toml b/pyproject.toml index 3d85b9f..9b51373 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "geny-executor" -version = "2.0.1" +version = "2.0.2" description = "Harness-engineered agent pipeline library with 21-stage dual-abstraction architecture, built on the Anthropic API" readme = "README.md" license = "MIT" diff --git a/src/geny_executor/__init__.py b/src/geny_executor/__init__.py index 7df39d9..30017ed 100644 --- a/src/geny_executor/__init__.py +++ b/src/geny_executor/__init__.py @@ -95,7 +95,7 @@ ProviderDrivenStrategy, ) -__version__ = "2.0.1" +__version__ = "2.0.2" __all__ = [ # Core diff --git a/src/geny_executor/llm_client/claude_code.py b/src/geny_executor/llm_client/claude_code.py index 92e454d..3a767c1 100644 --- a/src/geny_executor/llm_client/claude_code.py +++ b/src/geny_executor/llm_client/claude_code.py @@ -47,7 +47,12 @@ claude_code_argv, parse_json_output_to_response, ) -from geny_executor.llm_client.types import APIRequest, APIResponse +from geny_executor.llm_client.types import ( + APIRequest, + APIResponse, + ContentBlock, + TokenUsage, +) __all__ = ["ClaudeCodeCLIClient"] @@ -237,8 +242,17 @@ async def create_message_stream( Events match the format documented in ``translators._cli.stream_json_line_to_canonical_event``: ``text_delta``, ``thinking_delta``, ``input_json_delta``, - ``tool_use``, ``content_block_stop``, ``message_complete``, - ``result``, ``error``. + ``tool_use``, ``content_block_stop``, ``result``, ``error``. + + After the CLI exits we emit one final + ``{"type": "message_complete", "response": APIResponse}`` + event with the fully assembled response (text + thinking + + tool_use blocks, stop_reason, usage). Without this terminal + envelope the s06_api stage's streaming consumer raises + ``Stream ended without message_complete`` — it builds the + assistant message from ``chunk["response"]`` and the previous + implementation never populated that field. (Mirrors the + ``anthropic`` / ``openai`` / ``google`` SDK clients' contract.) """ request = self._build_request( model_config=model_config, @@ -262,14 +276,123 @@ async def create_message_stream( stream_json_line_to_canonical_event, ) + # Accumulator state — mirrors ``assemble_response_from_stream_json`` + # so the final message_complete envelope carries the same + # APIResponse the non-streaming path produces. + import json as _json + + text_buf: List[str] = [] + thinking_buf: List[str] = [] + tool_uses: List[Dict[str, Any]] = [] + current_tool: Optional[Dict[str, Any]] = None + final_obj: Optional[Dict[str, Any]] = None + message_id = "" + stop_reason = "end_turn" + resolved_model = model_config.model + try: async for raw in runner.stream(argv, stdin_iter=aiter_bytes(stdin)): line_obj = parse_stream_json_line(raw) if line_obj is None: continue + + # ── Accumulate for the terminal APIResponse ── + ltype = str(line_obj.get("type", "")) + if ltype == "system": + message_id = str( + line_obj.get("session_id") + or line_obj.get("message_id") + or message_id + ) + resolved_model = str(line_obj.get("model") or resolved_model) + elif ltype == "assistant": + delta = line_obj.get("delta") or {} + dtype = str(delta.get("type", "")) + if dtype == "text_delta": + text_buf.append(str(delta.get("text", ""))) + elif dtype == "thinking_delta": + thinking_buf.append(str(delta.get("text", ""))) + elif dtype == "input_json_delta": + if current_tool is not None: + current_tool.setdefault("_partial_json", "") + current_tool["_partial_json"] += str( + delta.get("partial_json", "") + ) + else: + cb = line_obj.get("content_block") + if isinstance(cb, dict) and cb.get("type") == "tool_use": + current_tool = { + "id": cb.get("id"), + "name": cb.get("name"), + "input": cb.get("input") or {}, + } + elif ltype == "content_block_stop": + if current_tool is not None: + partial = current_tool.pop("_partial_json", "") + if partial and not current_tool.get("input"): + try: + current_tool["input"] = _json.loads(partial) + except _json.JSONDecodeError: + current_tool["input"] = {"_raw": partial} + tool_uses.append(current_tool) + current_tool = None + elif ltype == "result": + final_obj = line_obj + stop_reason = str(line_obj.get("stop_reason", stop_reason)) + + # ── Yield the per-line canonical event ── + # Suppress the translator's bare ``message_complete`` + # (it carries no response field) — we emit the + # populated version after the loop. Everything else + # passes through unchanged. event = stream_json_line_to_canonical_event(line_obj) - if event is not None: - yield event + if event is None: + continue + if event.get("type") == "message_complete": + continue + yield event + + # ── Assemble + emit the terminal message_complete ── + blocks: List[ContentBlock] = [] + if thinking_buf: + blocks.append( + ContentBlock(type="thinking", thinking_text="".join(thinking_buf)) + ) + if text_buf: + blocks.append(ContentBlock(type="text", text="".join(text_buf))) + for tu in tool_uses: + blocks.append( + ContentBlock( + type="tool_use", + tool_use_id=tu.get("id"), + tool_name=tu.get("name"), + tool_input=tu.get("input") or {}, + ) + ) + + usage_in: Dict[str, Any] = (final_obj or {}).get("usage", {}) or {} + usage = TokenUsage( + input_tokens=int(usage_in.get("input_tokens", 0) or 0), + output_tokens=int(usage_in.get("output_tokens", 0) or 0), + cache_creation_input_tokens=int( + usage_in.get("cache_creation_input_tokens", 0) or 0 + ), + cache_read_input_tokens=int( + usage_in.get("cache_read_input_tokens", 0) or 0 + ), + cost_usd=usage_in.get("cost_usd"), + duration_ms=(final_obj or {}).get("duration_ms"), + ) + + response = APIResponse( + content=blocks, + stop_reason=stop_reason, + usage=usage, + model=resolved_model, + message_id=message_id, + raw=final_obj or {}, + ) + yield {"type": "message_complete", "response": response} except CLIBinaryNotFound as e: raise APIError(str(e), category=ErrorCategory.CLI_NOT_FOUND) from e except CLITimeout as e: diff --git a/tests/llm_client/unit/test_claude_code.py b/tests/llm_client/unit/test_claude_code.py index 0de637d..7c92c44 100644 --- a/tests/llm_client/unit/test_claude_code.py +++ b/tests/llm_client/unit/test_claude_code.py @@ -234,6 +234,34 @@ async def test_create_message_stream_yields_text_deltas() -> None: assert any(e.get("type") == "result" for e in events) +@pytest.mark.asyncio +async def test_create_message_stream_message_complete_carries_response() -> None: + """Regression: the terminal ``message_complete`` event must carry an + assembled ``APIResponse`` in ``chunk["response"]``. The s06_api + stage's streaming consumer raises ``Stream ended without + message_complete`` when this field is missing — that was the + Claude-Code-as-Stage-6 outage symptom. + """ + c = _client(text="hello world") + completes = [] + async for evt in c.create_message_stream( + model_config=ModelConfig(model="sonnet"), + messages=[{"role": "user", "content": "go"}], + ): + if evt.get("type") == "message_complete": + completes.append(evt) + + # Exactly one terminal envelope, populated. + assert len(completes) == 1 + final = completes[0] + assert "response" in final, "message_complete must include the response" + resp = final["response"] + assert resp.text == "hello world" + assert resp.stop_reason == "end_turn" + assert resp.usage.cost_usd is not None + assert resp.model # resolved from the system envelope or model_config + + # --------------------------------------------------------------------------- # Argv shape verification via the echo scenario # ---------------------------------------------------------------------------