From 8a232b712f934782364374d729233885fb41ba88 Mon Sep 17 00:00:00 2001 From: CocoRoF Date: Tue, 19 May 2026 11:45:34 +0900 Subject: [PATCH] fix(llm_client/claude_code): parse message-form stream-json + raise on auth-failed (2.0.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two follow-up bugs from the user's prod run after 2.0.2 unblocked the streaming control flow: 1. ``output_len=0`` despite the pipeline reporting SUCCESS. Claude Code 2.x's default stream-json output emits the full assistant message in one envelope (``{"type":"assistant","message":{"content": [{"type":"text","text":"..."}]}}``) — not the per-token delta shape the parser was written for. The accumulator returned an empty APIResponse for every session because its ``assistant.delta.text_delta`` branch never matched. 2. ``"Not logged in · Please run /login"`` came back as the assistant's reply when the CLI's credential cache was empty. The CLI annotates that frame with ``error=authentication_failed`` but otherwise looks like a normal assistant message; the parser accepted it as legitimate output. Fix: - Introduce ``StreamJsonAccumulator`` — one shared parser used by ``create_message_stream`` (streaming caller) and ``assemble_response_from_stream_json`` (non-streaming caller). Handles both stream-json shapes: * delta form (``--include-partial-messages`` on, true streaming) * full-message form (Claude Code 2.x default) Synthesises per-block ``text_delta`` / ``thinking_delta`` / ``tool_use`` events when the message form arrives so UI consumers see the same canonical shape they'd see with true streaming. - Raise ``APIError(CLI_AUTH_FAILED)`` when the CLI tags a frame with ``error="authentication_failed"``. Host surfaces the auth problem to the user instead of running a successful pipeline that silently returned a placeholder. - Update the single-event translator ``stream_json_line_to_canonical_event`` to collapse a full message's text blocks into one ``text_delta`` so legacy consumers that don't use the accumulator still see content. ### Tests - ``test_create_message_stream_message_form_collects_text`` - ``test_create_message_stream_authentication_failed_raises`` - ``test_send_streaming_message_form_text`` - New ``ok_message_form`` / ``message_form_auth_failed`` scenarios in the fake binary mirror the real CLI 2.1.144 output. Full ``tests/llm_client/`` — 187/187 pass. --- CHANGELOG.md | 32 ++ pyproject.toml | 2 +- src/geny_executor/__init__.py | 2 +- src/geny_executor/llm_client/claude_code.py | 151 ++----- .../llm_client/translators/__init__.py | 2 + .../llm_client/translators/_cli.py | 367 +++++++++++++----- tests/_fixtures/fake_claude.py | 64 +++ tests/llm_client/unit/test_claude_code.py | 55 +++ 8 files changed, 456 insertions(+), 219 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3805243..e2c1fa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,38 @@ All notable changes to `geny-executor` are recorded here. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/). +## [2.0.3] — 2026-05-19 + +Patch release. Fixes empty assistant output (`output_len=0`) when +Claude Code (CLI) 2.x is the Stage 6 provider, and surfaces +authentication failures as ``APIError`` instead of silently +returning a "Not logged in" placeholder. + +### Fixed + +- ``ClaudeCodeCLIClient.create_message_stream`` / + ``assemble_response_from_stream_json`` now accumulate text from the + **full-message** stream-json shape Claude Code 2.x emits by + default (``{"type":"assistant","message":{"content":[...]}}``) in + addition to the **delta** shape (``--include-partial-messages`` + on). The 2.0.2 fix unblocked the streaming control flow but only + parsed delta-form text, so every session came back with + ``output_len=0`` even though the CLI did real work for ~6s. +- The CLI's ``assistant`` envelope occasionally carries + ``error="authentication_failed"`` with a placeholder ``"Not logged + in"`` text block. The streaming path now raises + ``APIError(category=CLI_AUTH_FAILED)`` so the host surfaces the + problem instead of returning the placeholder as the assistant's + reply. +- Both parser paths now share one ``StreamJsonAccumulator`` so the + streaming + non-streaming consumers never drift apart again. + +### Added + +- ``StreamJsonAccumulator`` exported from + ``geny_executor.llm_client.translators`` for hosts that want to + pipe a custom stream-json source into the canonical response shape. + ## [2.0.2] — 2026-05-19 Patch release. Fixes streaming Stage 6 calls failing with diff --git a/pyproject.toml b/pyproject.toml index 9b51373..09fa21f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "geny-executor" -version = "2.0.2" +version = "2.0.3" description = "Harness-engineered agent pipeline library with 21-stage dual-abstraction architecture, built on the Anthropic API" readme = "README.md" license = "MIT" diff --git a/src/geny_executor/__init__.py b/src/geny_executor/__init__.py index 30017ed..d2ca80e 100644 --- a/src/geny_executor/__init__.py +++ b/src/geny_executor/__init__.py @@ -95,7 +95,7 @@ ProviderDrivenStrategy, ) -__version__ = "2.0.2" +__version__ = "2.0.3" __all__ = [ # Core diff --git a/src/geny_executor/llm_client/claude_code.py b/src/geny_executor/llm_client/claude_code.py index 3a767c1..73f08ae 100644 --- a/src/geny_executor/llm_client/claude_code.py +++ b/src/geny_executor/llm_client/claude_code.py @@ -42,17 +42,13 @@ ) from geny_executor.llm_client.base import BaseClient, ClientCapabilities from geny_executor.llm_client.translators._cli import ( + StreamJsonAccumulator, assemble_response_from_stream_json, build_stream_json_stdin, claude_code_argv, parse_json_output_to_response, ) -from geny_executor.llm_client.types import ( - APIRequest, - APIResponse, - ContentBlock, - TokenUsage, -) +from geny_executor.llm_client.types import APIRequest, APIResponse __all__ = ["ClaudeCodeCLIClient"] @@ -272,127 +268,50 @@ async def create_message_stream( stdin = build_stream_json_stdin(messages) from geny_executor.llm_client._cli_runtime import parse_stream_json_line - from geny_executor.llm_client.translators._cli import ( - stream_json_line_to_canonical_event, - ) - - # Accumulator state — mirrors ``assemble_response_from_stream_json`` - # so the final message_complete envelope carries the same - # APIResponse the non-streaming path produces. - import json as _json - text_buf: List[str] = [] - thinking_buf: List[str] = [] - tool_uses: List[Dict[str, Any]] = [] - current_tool: Optional[Dict[str, Any]] = None - final_obj: Optional[Dict[str, Any]] = None - message_id = "" - stop_reason = "end_turn" - resolved_model = model_config.model + # Shared accumulator handles both stream-json shapes: + # - delta form (``--include-partial-messages`` on, true streaming) + # - full-message form (Claude Code 2.x default — content[] + # arrives in one ``assistant`` envelope). + # Without the message-form branch, every assistant frame yielded + # zero text and the terminal APIResponse came back empty — + # exactly the symptom the user reported (``output_len=0``). + accum = StreamJsonAccumulator(model=model_config.model) try: async for raw in runner.stream(argv, stdin_iter=aiter_bytes(stdin)): line_obj = parse_stream_json_line(raw) if line_obj is None: continue - - # ── Accumulate for the terminal APIResponse ── - ltype = str(line_obj.get("type", "")) - if ltype == "system": - message_id = str( - line_obj.get("session_id") - or line_obj.get("message_id") - or message_id - ) - resolved_model = str(line_obj.get("model") or resolved_model) - elif ltype == "assistant": - delta = line_obj.get("delta") or {} - dtype = str(delta.get("type", "")) - if dtype == "text_delta": - text_buf.append(str(delta.get("text", ""))) - elif dtype == "thinking_delta": - thinking_buf.append(str(delta.get("text", ""))) - elif dtype == "input_json_delta": - if current_tool is not None: - current_tool.setdefault("_partial_json", "") - current_tool["_partial_json"] += str( - delta.get("partial_json", "") - ) - else: - cb = line_obj.get("content_block") - if isinstance(cb, dict) and cb.get("type") == "tool_use": - current_tool = { - "id": cb.get("id"), - "name": cb.get("name"), - "input": cb.get("input") or {}, - } - elif ltype == "content_block_stop": - if current_tool is not None: - partial = current_tool.pop("_partial_json", "") - if partial and not current_tool.get("input"): - try: - current_tool["input"] = _json.loads(partial) - except _json.JSONDecodeError: - current_tool["input"] = {"_raw": partial} - tool_uses.append(current_tool) - current_tool = None - elif ltype == "result": - final_obj = line_obj - stop_reason = str(line_obj.get("stop_reason", stop_reason)) - - # ── Yield the per-line canonical event ── - # Suppress the translator's bare ``message_complete`` - # (it carries no response field) — we emit the - # populated version after the loop. Everything else - # passes through unchanged. - event = stream_json_line_to_canonical_event(line_obj) - if event is None: - continue - if event.get("type") == "message_complete": + if "__malformed__" in line_obj: continue - yield event - - # ── Assemble + emit the terminal message_complete ── - blocks: List[ContentBlock] = [] - if thinking_buf: - blocks.append( - ContentBlock(type="thinking", thinking_text="".join(thinking_buf)) - ) - if text_buf: - blocks.append(ContentBlock(type="text", text="".join(text_buf))) - for tu in tool_uses: - blocks.append( - ContentBlock( - type="tool_use", - tool_use_id=tu.get("id"), - tool_name=tu.get("name"), - tool_input=tu.get("input") or {}, + # Surface CLI-side errors as APIError so the stage's + # retry/escalate path runs instead of silently producing + # an empty response. + if str(line_obj.get("type", "")) == "error": + raise APIError( + f"Claude Code CLI reported error: " + f"{line_obj.get('message') or line_obj!r}", + category=ErrorCategory.CLI_PROTOCOL_ERROR, + ) + # Surface the authentication_failed annotation that the + # CLI emits on the assistant frame when no credential + # is available — without this we'd swallow the + # "Not logged in" placeholder text as the assistant's + # answer and call the session "successful". + if str(line_obj.get("error", "")) == "authentication_failed": + raise APIError( + "Claude Code CLI is not authenticated (claude --print " + "returned error=authentication_failed). Sign in via " + "Settings → LLM Backends → Claude Code (CLI).", + category=ErrorCategory.CLI_AUTH_FAILED, ) - ) - usage_in: Dict[str, Any] = (final_obj or {}).get("usage", {}) or {} - usage = TokenUsage( - input_tokens=int(usage_in.get("input_tokens", 0) or 0), - output_tokens=int(usage_in.get("output_tokens", 0) or 0), - cache_creation_input_tokens=int( - usage_in.get("cache_creation_input_tokens", 0) or 0 - ), - cache_read_input_tokens=int( - usage_in.get("cache_read_input_tokens", 0) or 0 - ), - cost_usd=usage_in.get("cost_usd"), - duration_ms=(final_obj or {}).get("duration_ms"), - ) + # Feed accumulator + stream canonical events to consumer. + for event in accum.feed(line_obj): + yield event - response = APIResponse( - content=blocks, - stop_reason=stop_reason, - usage=usage, - model=resolved_model, - message_id=message_id, - raw=final_obj or {}, - ) - yield {"type": "message_complete", "response": response} + yield {"type": "message_complete", "response": accum.finalize()} except CLIBinaryNotFound as e: raise APIError(str(e), category=ErrorCategory.CLI_NOT_FOUND) from e except CLITimeout as e: diff --git a/src/geny_executor/llm_client/translators/__init__.py b/src/geny_executor/llm_client/translators/__init__.py index 381ba65..5a16548 100644 --- a/src/geny_executor/llm_client/translators/__init__.py +++ b/src/geny_executor/llm_client/translators/__init__.py @@ -29,6 +29,7 @@ split_tool_uses, ) from geny_executor.llm_client.translators._cli import ( + StreamJsonAccumulator, assemble_response_from_stream_json, build_stream_json_stdin, claude_code_argv, @@ -62,5 +63,6 @@ "split_tool_results", "split_tool_uses", "stream_json_line_to_canonical_event", + "StreamJsonAccumulator", "thinking_to_effort", ] diff --git a/src/geny_executor/llm_client/translators/_cli.py b/src/geny_executor/llm_client/translators/_cli.py index 878eded..1c5702c 100644 --- a/src/geny_executor/llm_client/translators/_cli.py +++ b/src/geny_executor/llm_client/translators/_cli.py @@ -250,6 +250,21 @@ def stream_json_line_to_canonical_event(line_obj: Dict[str, Any]) -> Optional[Di "name": cb.get("name"), "input": cb.get("input") or {}, } + # Full-message form (Claude Code 2.x default): collapse the + # entire content array to a single concatenated text_delta so + # legacy single-event consumers see SOME text. Callers that + # need per-block fidelity should use ``StreamJsonAccumulator`` + # directly. + msg = line_obj.get("message") or {} + if isinstance(msg, dict): + parts: List[str] = [] + for block in (msg.get("content") or []): + if isinstance(block, dict) and block.get("type") == "text": + text = str(block.get("text", "")) + if text: + parts.append(text) + if parts: + return {"type": "text_delta", "text": "".join(parts)} return None if ltype == "content_block_stop": @@ -339,6 +354,245 @@ def parse_json_output_to_response(stdout: bytes, *, model: str) -> APIResponse: # --------------------------------------------------------------------------- +class StreamJsonAccumulator: + """Walk Claude Code stream-json lines and accumulate the final response. + + Handles both shapes the CLI emits (the shape varies by version + by + ``--include-partial-messages``): + + 1. **Delta form** (true streaming, ``--include-partial-messages`` on): + ``{"type":"assistant","delta":{"type":"text_delta","text":"..."}}`` + — one delta per token-ish chunk; ``content_block_stop`` terminates a + block. + 2. **Message form** (default + observed on claude_code 2.1.144): + ``{"type":"assistant","message":{"content":[ + {"type":"text","text":"..."}, + {"type":"thinking","thinking":"..."}, + {"type":"tool_use","id":"...","name":"...","input":{...}}, + ],"stop_reason":"...","usage":{...}}}`` + — the full assistant message arrives in one envelope. + + The accumulator's ``feed(line)`` returns a list of canonical UI events + ({"type":"text_delta", ...} etc.) that callers stream to consumers, + while internally bookkeeping the state needed to call ``finalize()`` + for the terminal :class:`APIResponse`. + """ + + def __init__(self, model: str) -> None: + self._text_buf: List[str] = [] + self._thinking_buf: List[str] = [] + self._tool_uses: List[Dict[str, Any]] = [] + self._current_tool: Optional[Dict[str, Any]] = None + self._final_obj: Optional[Dict[str, Any]] = None + self._message_id = "" + self._stop_reason = "end_turn" + self._resolved_model = model + + # ── Public ──────────────────────────────────────────────── + + def feed(self, line: Dict[str, Any]) -> List[Dict[str, Any]]: + """Update state from one stream-json line. + + Returns the list of canonical UI events the line produced + (``text_delta`` / ``thinking_delta`` / ``tool_use`` / ...). + Caller is responsible for yielding them to its own consumer. + Empty list when the line is bookkeeping-only. + """ + if not isinstance(line, dict) or "__malformed__" in line: + return [] + ltype = str(line.get("type", "")) + + if ltype == "system": + self._message_id = str( + line.get("session_id") or line.get("message_id") or self._message_id + ) + self._resolved_model = str(line.get("model") or self._resolved_model) + return [] + + if ltype == "assistant": + return self._feed_assistant(line) + + if ltype == "content_block_stop": + self._close_current_tool() + return [{"type": "content_block_stop"}] + + if ltype == "message_stop": + # Suppressed at this layer — the streaming caller emits one + # populated ``message_complete`` after ``finalize()``. + return [] + + if ltype == "result": + self._final_obj = line + self._stop_reason = str(line.get("stop_reason", self._stop_reason)) + # ``message`` form puts stop_reason on the assistant envelope + # too; keep whichever non-empty value won. + return [{"type": "result", "raw": line}] + + if ltype == "error": + return [{"type": "error", "raw": line}] + + return [{"type": "cli_unknown", "raw": line}] + + def finalize(self) -> APIResponse: + """Build the canonical :class:`APIResponse` from accumulated state.""" + # Flush any unclosed tool — the message form often skips + # ``content_block_stop`` entirely. + self._close_current_tool() + + blocks: List[ContentBlock] = [] + if self._thinking_buf: + blocks.append( + ContentBlock(type="thinking", thinking_text="".join(self._thinking_buf)) + ) + if self._text_buf: + blocks.append(ContentBlock(type="text", text="".join(self._text_buf))) + for tu in self._tool_uses: + blocks.append( + ContentBlock( + type="tool_use", + tool_use_id=tu.get("id"), + tool_name=tu.get("name"), + tool_input=tu.get("input") or {}, + ) + ) + + usage_in: Dict[str, Any] = (self._final_obj or {}).get("usage", {}) or {} + usage = TokenUsage( + input_tokens=int(usage_in.get("input_tokens", 0) or 0), + output_tokens=int(usage_in.get("output_tokens", 0) or 0), + cache_creation_input_tokens=int( + usage_in.get("cache_creation_input_tokens", 0) or 0 + ), + cache_read_input_tokens=int(usage_in.get("cache_read_input_tokens", 0) or 0), + cost_usd=usage_in.get("cost_usd") + or (self._final_obj or {}).get("total_cost_usd"), + duration_ms=(self._final_obj or {}).get("duration_ms"), + ) + + return APIResponse( + content=blocks, + stop_reason=self._stop_reason, + usage=usage, + model=self._resolved_model, + message_id=self._message_id, + raw=self._final_obj or {}, + ) + + # ── Internals ───────────────────────────────────────────── + + def _feed_assistant(self, line: Dict[str, Any]) -> List[Dict[str, Any]]: + # Form 1 — delta (true streaming). + delta = line.get("delta") or {} + dtype = str(delta.get("type", "")) + if dtype == "text_delta": + text = str(delta.get("text", "")) + self._text_buf.append(text) + return [{"type": "text_delta", "text": text}] if text else [] + if dtype == "thinking_delta": + text = str(delta.get("text", "")) + self._thinking_buf.append(text) + return [{"type": "thinking_delta", "text": text}] if text else [] + if dtype == "input_json_delta": + partial = str(delta.get("partial_json", "")) + if self._current_tool is not None: + self._current_tool.setdefault("_partial_json", "") + self._current_tool["_partial_json"] += partial + return [{"type": "input_json_delta", "delta": partial}] + cb = line.get("content_block") + if isinstance(cb, dict) and cb.get("type") == "tool_use": + self._current_tool = { + "id": cb.get("id"), + "name": cb.get("name"), + "input": cb.get("input") or {}, + } + return [ + { + "type": "tool_use", + "id": cb.get("id"), + "name": cb.get("name"), + "input": cb.get("input") or {}, + } + ] + + # Form 2 — full message (default Claude Code 2.x output). + message = line.get("message") or {} + if isinstance(message, dict) and message.get("content"): + return self._feed_message(message) + + return [] + + def _feed_message(self, message: Dict[str, Any]) -> List[Dict[str, Any]]: + """Process a full assistant message envelope's content array. + + Emits synthetic per-block delta events so UI consumers see the + same canonical shape they would with true streaming, then + records the blocks for the eventual :class:`APIResponse`. + """ + # Capture stop_reason / usage off the envelope if present — + # the ``message`` form lets the assistant frame carry these + # instead of waiting for the final ``result`` line. + sr = message.get("stop_reason") + if sr: + self._stop_reason = str(sr) + usage = message.get("usage") + if isinstance(usage, dict) and self._final_obj is None: + self._final_obj = {"usage": usage} + # Skip synthetic "Not logged in" messages — Claude Code emits + # them with ``error=authentication_failed`` and a placeholder + # text block. Surface as an APIError-friendly error event so + # callers raise instead of returning empty output. + # (Detected on the outer ``line``, but ``message`` is the + # carrier so we pass it through unchanged here.) + + events: List[Dict[str, Any]] = [] + content = message.get("content") or [] + if not isinstance(content, list): + return events + for block in content: + if not isinstance(block, dict): + continue + btype = str(block.get("type", "")) + if btype == "text": + text = str(block.get("text", "")) + if text: + self._text_buf.append(text) + events.append({"type": "text_delta", "text": text}) + elif btype == "thinking": + # Anthropic uses ``thinking`` field; some shims use ``text``. + text = str(block.get("thinking") or block.get("text") or "") + if text: + self._thinking_buf.append(text) + events.append({"type": "thinking_delta", "text": text}) + elif btype == "tool_use": + tu = { + "id": block.get("id"), + "name": block.get("name"), + "input": block.get("input") or {}, + } + self._tool_uses.append(tu) + events.append( + { + "type": "tool_use", + "id": tu["id"], + "name": tu["name"], + "input": tu["input"], + } + ) + return events + + def _close_current_tool(self) -> None: + if self._current_tool is None: + return + partial = self._current_tool.pop("_partial_json", "") + if partial and not self._current_tool.get("input"): + try: + self._current_tool["input"] = json.loads(partial) + except json.JSONDecodeError: + self._current_tool["input"] = {"_raw": partial} + self._tool_uses.append(self._current_tool) + self._current_tool = None + + async def assemble_response_from_stream_json( stream: AsyncIterator[bytes], *, @@ -346,120 +600,31 @@ async def assemble_response_from_stream_json( ) -> APIResponse: """Drain a stream-json output and return a canonical APIResponse. - Used by ``ClaudeCodeCLIClient._send`` when ``request.stream=True``. The - final ``result`` envelope carries usage + stop_reason; the accumulated - text / thinking / tool_use blocks come from intermediate ``assistant`` - deltas. + Used by ``ClaudeCodeCLIClient._send`` when ``request.stream=True``. + Thin wrapper around :class:`StreamJsonAccumulator` so the + streaming + non-streaming consumer paths share one parser — Claude + Code's stream-json shape (delta vs full-message) varies by CLI + version and ``--include-partial-messages``, and we never want the + two paths to drift again. """ - text_buf: List[str] = [] - thinking_buf: List[str] = [] - tool_uses: List[Dict[str, Any]] = [] - current_tool: Optional[Dict[str, Any]] = None - final_obj: Optional[Dict[str, Any]] = None - message_id = "" - stop_reason = "end_turn" - resolved_model = model - from geny_executor.llm_client._cli_runtime import parse_stream_json_line + accum = StreamJsonAccumulator(model=model) async for raw in stream: line = parse_stream_json_line(raw) if line is None: continue if "__malformed__" in line: - # Skip malformed lines — caller's CLIProtocolError path - # already runs on non-zero exits. - continue - ltype = str(line.get("type", "")) - - if ltype == "system": - # The first system envelope carries session metadata. - message_id = str(line.get("session_id") or line.get("message_id") or message_id) - resolved_model = str(line.get("model") or resolved_model) - continue - - if ltype == "assistant": - delta = line.get("delta") or {} - dtype = str(delta.get("type", "")) - if dtype == "text_delta": - text_buf.append(str(delta.get("text", ""))) - continue - if dtype == "thinking_delta": - thinking_buf.append(str(delta.get("text", ""))) - continue - if dtype == "input_json_delta": - if current_tool is not None: - current_tool.setdefault("_partial_json", "") - current_tool["_partial_json"] += str(delta.get("partial_json", "")) - continue - cb = line.get("content_block") - if isinstance(cb, dict) and cb.get("type") == "tool_use": - current_tool = { - "id": cb.get("id"), - "name": cb.get("name"), - "input": cb.get("input") or {}, - } - continue - - if ltype == "content_block_stop": - if current_tool is not None: - # Finalise the in-flight tool block. If we accumulated a - # partial_json buffer, try parsing it as the input. - partial = current_tool.pop("_partial_json", "") - if partial and not current_tool.get("input"): - try: - current_tool["input"] = json.loads(partial) - except json.JSONDecodeError: - current_tool["input"] = {"_raw": partial} - tool_uses.append(current_tool) - current_tool = None - continue - - if ltype == "result": - final_obj = line - stop_reason = str(line.get("stop_reason", stop_reason)) continue - - if ltype == "error": + # ``error`` envelopes from the CLI need to raise so the caller's + # CLIProtocolError path runs — match the prior behaviour exactly. + if str(line.get("type", "")) == "error": raise RuntimeError( f"Claude Code CLI reported error: {line.get('message') or line!r}" ) + accum.feed(line) - blocks: List[ContentBlock] = [] - if thinking_buf: - blocks.append(ContentBlock(type="thinking", thinking_text="".join(thinking_buf))) - if text_buf: - blocks.append(ContentBlock(type="text", text="".join(text_buf))) - for tu in tool_uses: - blocks.append( - ContentBlock( - type="tool_use", - tool_use_id=tu.get("id"), - tool_name=tu.get("name"), - tool_input=tu.get("input") or {}, - ) - ) - - usage_in: Dict[str, Any] = {} - if final_obj: - usage_in = final_obj.get("usage", {}) or {} - usage = TokenUsage( - input_tokens=int(usage_in.get("input_tokens", 0) or 0), - output_tokens=int(usage_in.get("output_tokens", 0) or 0), - cache_creation_input_tokens=int(usage_in.get("cache_creation_input_tokens", 0) or 0), - cache_read_input_tokens=int(usage_in.get("cache_read_input_tokens", 0) or 0), - cost_usd=usage_in.get("cost_usd"), - duration_ms=(final_obj or {}).get("duration_ms"), - ) - - return APIResponse( - content=blocks, - stop_reason=stop_reason, - usage=usage, - model=resolved_model, - message_id=message_id, - raw=final_obj, - ) + return accum.finalize() # --------------------------------------------------------------------------- diff --git a/tests/_fixtures/fake_claude.py b/tests/_fixtures/fake_claude.py index 78de2a9..d477936 100755 --- a/tests/_fixtures/fake_claude.py +++ b/tests/_fixtures/fake_claude.py @@ -105,6 +105,68 @@ def _auth_fail(argv: List[str]) -> int: return 1 +def _ok_message_form(argv: List[str]) -> int: + """Real Claude Code 2.x stream-json shape (no ``--include-partial-messages``). + + The CLI puts the entire assistant message inside one ``assistant`` + envelope's ``message.content[]`` instead of streaming deltas. The + parser must accumulate text from this shape just as well as from + the delta variant. + """ + text = os.environ.get("FAKE_CLAUDE_TEXT", "hello world") + _emit_line({ + "type": "system", "subtype": "init", + "session_id": "fake-msg-1", "model": "claude-sonnet-4-6", + }) + _emit_line({ + "type": "assistant", + "message": { + "id": "msg_fake_form_2", + "role": "assistant", + "type": "message", + "stop_reason": "end_turn", + "usage": {"input_tokens": 7, "output_tokens": len(text)}, + "content": [{"type": "text", "text": text}], + }, + }) + _emit_line({ + "type": "result", "subtype": "success", "is_error": False, + "stop_reason": "end_turn", + "total_cost_usd": 0.0001, + "duration_ms": 250, + "usage": {"input_tokens": 7, "output_tokens": len(text)}, + }) + return 0 + + +def _message_form_auth_failed(argv: List[str]) -> int: + """Reproduce the "Not logged in" path the user hit on prod. + + Claude Code emits the synthetic placeholder text inside a normal + ``assistant.message.content[]`` envelope BUT annotates the outer + line with ``error=authentication_failed``. The parser must raise + APIError(CLI_AUTH_FAILED) instead of returning the placeholder + text as the assistant's reply. + """ + _emit_line({"type": "system", "subtype": "init", "session_id": "fake-na-1", "model": "claude-opus-4-7"}) + _emit_line({ + "type": "assistant", + "error": "authentication_failed", + "message": { + "id": "msg_na", + "role": "assistant", + "type": "message", + "stop_reason": "stop_sequence", + "content": [{"type": "text", "text": "Not logged in · Please run /login"}], + }, + }) + _emit_line({ + "type": "result", "subtype": "success", "is_error": True, + "result": "Not logged in · Please run /login", + }) + return 0 + + def _permission_fail(argv: List[str]) -> int: sys.stderr.write("permission denied: tool Bash blocked by policy\n") return 1 @@ -135,6 +197,8 @@ def _echo_argv(argv: List[str]) -> int: "ok_text": _ok_text, "ok_tool_use": _ok_tool_use, "ok_thinking": _ok_thinking, + "ok_message_form": _ok_message_form, + "message_form_auth_failed": _message_form_auth_failed, "auth_fail": _auth_fail, "permission_fail": _permission_fail, "crash": _crash, diff --git a/tests/llm_client/unit/test_claude_code.py b/tests/llm_client/unit/test_claude_code.py index 7c92c44..856fcdc 100644 --- a/tests/llm_client/unit/test_claude_code.py +++ b/tests/llm_client/unit/test_claude_code.py @@ -262,6 +262,61 @@ async def test_create_message_stream_message_complete_carries_response() -> None assert resp.model # resolved from the system envelope or model_config +@pytest.mark.asyncio +async def test_create_message_stream_message_form_collects_text() -> None: + """Regression: when Claude Code emits the ``assistant.message.content[]`` + shape (the 2.x default, no ``--include-partial-messages``), text + blocks must be accumulated into the terminal APIResponse. The + earlier accumulator only handled the delta shape so every session + came back with ``output_len=0`` even though the CLI did real work. + """ + c = _client(scenario="ok_message_form", text="안녕하세요") + events = [] + async for evt in c.create_message_stream( + model_config=ModelConfig(model="sonnet"), + messages=[{"role": "user", "content": "ㅎㅇ"}], + ): + events.append(evt) + + text_deltas = [e for e in events if e.get("type") == "text_delta"] + assert text_deltas, "message form must produce at least one text_delta" + assert "".join(d["text"] for d in text_deltas) == "안녕하세요" + + completes = [e for e in events if e.get("type") == "message_complete"] + assert len(completes) == 1 + resp = completes[0]["response"] + assert resp.text == "안녕하세요" + assert resp.stop_reason == "end_turn" + + +@pytest.mark.asyncio +async def test_create_message_stream_authentication_failed_raises() -> None: + """Regression: the CLI emits an ``assistant`` envelope with + ``error=authentication_failed`` + placeholder text "Not logged + in" when no credential is available. The placeholder must not + be returned as the assistant's reply — raise APIError so the + pipeline surfaces the auth problem to the user.""" + c = _client(scenario="message_form_auth_failed") + with pytest.raises(APIError) as exc_info: + async for _ in c.create_message_stream( + model_config=ModelConfig(model="sonnet"), + messages=[{"role": "user", "content": "hi"}], + ): + pass + assert exc_info.value.category == ErrorCategory.CLI_AUTH_FAILED + + +@pytest.mark.asyncio +async def test_send_streaming_message_form_text() -> None: + """Non-streaming caller via ``_send(stream=True)`` must also + collect text from the message form. Mirrors the streaming-from- + consumer-POV test above for the assembler path.""" + c = _client(scenario="ok_message_form", text="배포 완료") + resp = await c._send(_make_request(stream=True)) + assert resp.text == "배포 완료" + assert resp.stop_reason == "end_turn" + + # --------------------------------------------------------------------------- # Argv shape verification via the echo scenario # ---------------------------------------------------------------------------