Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,36 @@ All notable changes to `geny-executor` are recorded here. The format
follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and
this project adheres to [Semantic Versioning](https://semver.org/).

## [2.0.2] — 2026-05-19

Patch release. Fixes streaming Stage 6 calls failing with
``Stream ended without message_complete`` when ``claude_code_cli``
was the selected provider.

### Fixed

- ``ClaudeCodeCLIClient.create_message_stream`` now emits a populated
``{"type": "message_complete", "response": APIResponse}`` envelope
after the CLI exits. The previous implementation passed the
translator's bare ``{"type": "message_complete"}`` straight through,
with no ``response`` field — and the s06_api default stage's
``_call_streaming`` reads exactly that field to build the assistant
message, so the streaming path raised
``APIError("Stream ended without message_complete")`` for every
Claude Code (CLI) session. The streaming client now accumulates
text / thinking / tool_use blocks + the final ``result`` envelope's
usage as events flow, then yields one terminal envelope mirroring
the contract every SDK client (anthropic / openai / google) already
honours. Per-line ``text_delta`` / ``content_block_stop`` / ``result``
events still flow as before so downstream consumers that watch the
partial stream behave unchanged.

### Migration

None. Behaviour change is strictly additive — code that ignored
``message_complete`` (or never selected ``claude_code_cli`` for s06)
sees no observable difference.

## [2.0.1] — 2026-05-18

Patch release. Fixes a crash when a manifest names ``"subagent_type"``
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "geny-executor"
version = "2.0.1"
version = "2.0.2"
description = "Harness-engineered agent pipeline library with 21-stage dual-abstraction architecture, built on the Anthropic API"
readme = "README.md"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion src/geny_executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
ProviderDrivenStrategy,
)

__version__ = "2.0.1"
__version__ = "2.0.2"

__all__ = [
# Core
Expand Down
133 changes: 128 additions & 5 deletions src/geny_executor/llm_client/claude_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@
claude_code_argv,
parse_json_output_to_response,
)
from geny_executor.llm_client.types import APIRequest, APIResponse
from geny_executor.llm_client.types import (
APIRequest,
APIResponse,
ContentBlock,
TokenUsage,
)


__all__ = ["ClaudeCodeCLIClient"]
Expand Down Expand Up @@ -237,8 +242,17 @@ async def create_message_stream(
Events match the format documented in
``translators._cli.stream_json_line_to_canonical_event``:
``text_delta``, ``thinking_delta``, ``input_json_delta``,
``tool_use``, ``content_block_stop``, ``message_complete``,
``result``, ``error``.
``tool_use``, ``content_block_stop``, ``result``, ``error``.

After the CLI exits we emit one final
``{"type": "message_complete", "response": APIResponse}``
event with the fully assembled response (text + thinking +
tool_use blocks, stop_reason, usage). Without this terminal
envelope the s06_api stage's streaming consumer raises
``Stream ended without message_complete`` — it builds the
assistant message from ``chunk["response"]`` and the previous
implementation never populated that field. (Mirrors the
``anthropic`` / ``openai`` / ``google`` SDK clients' contract.)
"""
request = self._build_request(
model_config=model_config,
Expand All @@ -262,14 +276,123 @@ async def create_message_stream(
stream_json_line_to_canonical_event,
)

# Accumulator state — mirrors ``assemble_response_from_stream_json``
# so the final message_complete envelope carries the same
# APIResponse the non-streaming path produces.
import json as _json

text_buf: List[str] = []
thinking_buf: List[str] = []
tool_uses: List[Dict[str, Any]] = []
current_tool: Optional[Dict[str, Any]] = None
final_obj: Optional[Dict[str, Any]] = None
message_id = ""
stop_reason = "end_turn"
resolved_model = model_config.model

try:
async for raw in runner.stream(argv, stdin_iter=aiter_bytes(stdin)):
line_obj = parse_stream_json_line(raw)
if line_obj is None:
continue

# ── Accumulate for the terminal APIResponse ──
ltype = str(line_obj.get("type", ""))
if ltype == "system":
message_id = str(
line_obj.get("session_id")
or line_obj.get("message_id")
or message_id
)
resolved_model = str(line_obj.get("model") or resolved_model)
elif ltype == "assistant":
delta = line_obj.get("delta") or {}
dtype = str(delta.get("type", ""))
if dtype == "text_delta":
text_buf.append(str(delta.get("text", "")))
elif dtype == "thinking_delta":
thinking_buf.append(str(delta.get("text", "")))
elif dtype == "input_json_delta":
if current_tool is not None:
current_tool.setdefault("_partial_json", "")
current_tool["_partial_json"] += str(
delta.get("partial_json", "")
)
else:
cb = line_obj.get("content_block")
if isinstance(cb, dict) and cb.get("type") == "tool_use":
current_tool = {
"id": cb.get("id"),
"name": cb.get("name"),
"input": cb.get("input") or {},
}
elif ltype == "content_block_stop":
if current_tool is not None:
partial = current_tool.pop("_partial_json", "")
if partial and not current_tool.get("input"):
try:
current_tool["input"] = _json.loads(partial)
except _json.JSONDecodeError:
current_tool["input"] = {"_raw": partial}
tool_uses.append(current_tool)
current_tool = None
elif ltype == "result":
final_obj = line_obj
stop_reason = str(line_obj.get("stop_reason", stop_reason))

# ── Yield the per-line canonical event ──
# Suppress the translator's bare ``message_complete``
# (it carries no response field) — we emit the
# populated version after the loop. Everything else
# passes through unchanged.
event = stream_json_line_to_canonical_event(line_obj)
if event is not None:
yield event
if event is None:
continue
if event.get("type") == "message_complete":
continue
yield event

# ── Assemble + emit the terminal message_complete ──
blocks: List[ContentBlock] = []
if thinking_buf:
blocks.append(
ContentBlock(type="thinking", thinking_text="".join(thinking_buf))
)
if text_buf:
blocks.append(ContentBlock(type="text", text="".join(text_buf)))
for tu in tool_uses:
blocks.append(
ContentBlock(
type="tool_use",
tool_use_id=tu.get("id"),
tool_name=tu.get("name"),
tool_input=tu.get("input") or {},
)
)

usage_in: Dict[str, Any] = (final_obj or {}).get("usage", {}) or {}
usage = TokenUsage(
input_tokens=int(usage_in.get("input_tokens", 0) or 0),
output_tokens=int(usage_in.get("output_tokens", 0) or 0),
cache_creation_input_tokens=int(
usage_in.get("cache_creation_input_tokens", 0) or 0
),
cache_read_input_tokens=int(
usage_in.get("cache_read_input_tokens", 0) or 0
),
cost_usd=usage_in.get("cost_usd"),
duration_ms=(final_obj or {}).get("duration_ms"),
)

response = APIResponse(
content=blocks,
stop_reason=stop_reason,
usage=usage,
model=resolved_model,
message_id=message_id,
raw=final_obj or {},
)
yield {"type": "message_complete", "response": response}
except CLIBinaryNotFound as e:
raise APIError(str(e), category=ErrorCategory.CLI_NOT_FOUND) from e
except CLITimeout as e:
Expand Down
28 changes: 28 additions & 0 deletions tests/llm_client/unit/test_claude_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,34 @@ async def test_create_message_stream_yields_text_deltas() -> None:
assert any(e.get("type") == "result" for e in events)


@pytest.mark.asyncio
async def test_create_message_stream_message_complete_carries_response() -> None:
"""Regression: the terminal ``message_complete`` event must carry an
assembled ``APIResponse`` in ``chunk["response"]``. The s06_api
stage's streaming consumer raises ``Stream ended without
message_complete`` when this field is missing — that was the
Claude-Code-as-Stage-6 outage symptom.
"""
c = _client(text="hello world")
completes = []
async for evt in c.create_message_stream(
model_config=ModelConfig(model="sonnet"),
messages=[{"role": "user", "content": "go"}],
):
if evt.get("type") == "message_complete":
completes.append(evt)

# Exactly one terminal envelope, populated.
assert len(completes) == 1
final = completes[0]
assert "response" in final, "message_complete must include the response"
resp = final["response"]
assert resp.text == "hello world"
assert resp.stop_reason == "end_turn"
assert resp.usage.cost_usd is not None
assert resp.model # resolved from the system envelope or model_config


# ---------------------------------------------------------------------------
# Argv shape verification via the echo scenario
# ---------------------------------------------------------------------------
Expand Down
Loading