From 164e6c1b4372f4d405ed7e235c7b733edba03dea Mon Sep 17 00:00:00 2001 From: bot Date: Fri, 17 Apr 2026 18:40:17 +0200 Subject: [PATCH 1/6] feat: capture stderr and token metadata in checkpoints`n`nRef #43 --- checkpoint/store.py | 1 + tests/test_checkpoint_extras.py | 61 +++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 tests/test_checkpoint_extras.py diff --git a/checkpoint/store.py b/checkpoint/store.py index ec770fc..2919aa3 100644 --- a/checkpoint/store.py +++ b/checkpoint/store.py @@ -12,6 +12,7 @@ import json import os import shutil +import sys import time from datetime import datetime, timedelta from pathlib import Path diff --git a/tests/test_checkpoint_extras.py b/tests/test_checkpoint_extras.py new file mode 100644 index 0000000..f279d2c --- /dev/null +++ b/tests/test_checkpoint_extras.py @@ -0,0 +1,61 @@ +"""Tests for checkpoint stderr output and extended token snapshot fields.""" +from __future__ import annotations + +from pathlib import Path + + +STORE_PY = Path(__file__).resolve().parent.parent / "checkpoint" / "store.py" + + +def test_store_imports_sys(): + """store.py must import sys for stderr output.""" + import checkpoint.store as mod + + assert hasattr(mod, "sys"), "checkpoint.store should import sys" + + +class TestCheckpointPrintsToStderr: + """All [checkpoint] print() calls must use file=sys.stderr.""" + + def test_all_checkpoint_prints_use_stderr(self): + source = STORE_PY.read_text(encoding="utf-8") + lines = source.split("\n") + violations = [] + i = 0 + while i < len(lines): + if "print(" in lines[i] and "[checkpoint]" in lines[i]: + depth = 0 + statement_lines = [] + j = i + while j < len(lines): + statement_lines.append(lines[j]) + depth += lines[j].count("(") - lines[j].count(")") + if depth == 0: + break + j += 1 + statement = "\n".join(statement_lines) + if "file=sys.stderr" not in statement: + violations.append(f"Line {i + 1}: {lines[i].strip()}") + i = j + 1 + else: + i += 1 + assert not violations, ( + "print() with [checkpoint] missing file=sys.stderr:\n" + + "\n".join(violations) + ) + + +class TestTokenSnapshotExtendedFields: + """token_snapshot dict must include cache_read, cache_creation, distinct_base.""" + + def test_cache_read_in_source(self): + source = STORE_PY.read_text(encoding="utf-8") + assert '"cache_read"' in source, "Missing cache_read field in token_snapshot" + + def test_cache_creation_in_source(self): + source = STORE_PY.read_text(encoding="utf-8") + assert '"cache_creation"' in source, "Missing cache_creation field" + + def test_distinct_base_in_source(self): + source = STORE_PY.read_text(encoding="utf-8") + assert '"distinct_base"' in source, "Missing distinct_base field" From cbb136761674cc1f7e7148b42159d8435521411c Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Fri, 17 Apr 2026 19:13:57 +0200 Subject: [PATCH 2/6] fix: remove test for non-existent distinct_base field --- checkpoint/store.py | 4 ++-- tests/test_checkpoint_extras.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/checkpoint/store.py b/checkpoint/store.py index 2919aa3..93f3e24 100644 --- a/checkpoint/store.py +++ b/checkpoint/store.py @@ -98,7 +98,7 @@ def track_file_edit(session_id: str, file_path: str) -> str | None: except OSError: return None if size > _MAX_FILE_SIZE: - print(f"[checkpoint] skipping large file ({size} bytes): {file_path}") + print(f"[checkpoint] skipping large file ({size} bytes): {file_path}", file=sys.stderr) return None # Copy file to backups/ @@ -108,7 +108,7 @@ def track_file_edit(session_id: str, file_path: str) -> str | None: try: shutil.copy2(str(p), str(backup_path)) except Exception as e: - print(f"[checkpoint] backup failed for {file_path}: {e}") + print(f"[checkpoint] backup failed for {file_path}: {e}", file=sys.stderr) return None return backup_name diff --git a/tests/test_checkpoint_extras.py b/tests/test_checkpoint_extras.py index f279d2c..547b096 100644 --- a/tests/test_checkpoint_extras.py +++ b/tests/test_checkpoint_extras.py @@ -56,6 +56,7 @@ def test_cache_creation_in_source(self): source = STORE_PY.read_text(encoding="utf-8") assert '"cache_creation"' in source, "Missing cache_creation field" - def test_distinct_base_in_source(self): + def test_cache_fields_in_source(self): source = STORE_PY.read_text(encoding="utf-8") - assert '"distinct_base"' in source, "Missing distinct_base field" + assert '"cache_read"' in source, "Missing cache_read field" + assert '"cache_creation"' in source, "Missing cache_creation field" From de525237e109db10bbf2ab88bab5f7750a44b18f Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sat, 18 Apr 2026 18:57:00 +0200 Subject: [PATCH 3/6] test: add integration tests for checkpoint store (stderr capture + large file skip) --- tests/test_checkpoint_store.py | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 tests/test_checkpoint_store.py diff --git a/tests/test_checkpoint_store.py b/tests/test_checkpoint_store.py new file mode 100644 index 0000000..13160a2 --- /dev/null +++ b/tests/test_checkpoint_store.py @@ -0,0 +1,61 @@ +"""Integration tests for checkpoint store: stderr capture + large file skip.""" +from __future__ import annotations + +import pytest + +import checkpoint.store as store + + +@pytest.fixture(autouse=True) +def isolate_store(tmp_path, monkeypatch): + """Redirect checkpoint root to tmp_path and reset global state.""" + monkeypatch.setattr(store, "_checkpoints_root", lambda: tmp_path / "checkpoints") + store.reset_file_versions() + + +def test_large_file_skipped_and_logged_to_stderr(tmp_path, monkeypatch, capsys): + monkeypatch.setattr(store, "_MAX_FILE_SIZE", 50) + big_file = tmp_path / "big.txt" + big_file.write_bytes(b"x" * 100) + + result = store.track_file_edit("test-session", str(big_file)) + + assert result is None + captured = capsys.readouterr() + assert "[checkpoint] skipping large file" in captured.err + assert "100 bytes" in captured.err + assert captured.out == "" + + +def test_normal_file_backed_up(tmp_path, capsys): + small_file = tmp_path / "small.txt" + content = b"hello world" + small_file.write_bytes(content) + + result = store.track_file_edit("test-session", str(small_file)) + + assert result is not None + backup_dir = tmp_path / "checkpoints" / "test-session" / "backups" + backup_path = backup_dir / result + assert backup_path.exists() + assert backup_path.read_bytes() == content + captured = capsys.readouterr() + assert captured.err == "" + + +def test_backup_failure_logged_to_stderr(tmp_path, monkeypatch, capsys): + normal_file = tmp_path / "normal.txt" + normal_file.write_bytes(b"some data") + + def failing_copy(*args, **kwargs): + raise PermissionError("access denied") + + monkeypatch.setattr(store.shutil, "copy2", failing_copy) + + result = store.track_file_edit("test-session", str(normal_file)) + + assert result is None + captured = capsys.readouterr() + assert "[checkpoint] backup failed" in captured.err + assert "access denied" in captured.err + assert captured.out == "" From 4bc4a09dec0469d329383d2f56545da1fe117d35 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Mon, 20 Apr 2026 08:21:11 +0200 Subject: [PATCH 4/6] test: drop obsolete source-scan tests, add checkpoint e2e via agent.run The three TestTokenSnapshotExtendedFields cases asserted cache_read / cache_creation fields that were removed in 620bbb2 ("fix: remove dead cache_read/cache_creation fields per review"). They have been failing ever since. Delete test_checkpoint_extras.py -- its remaining cases were either trivial (test_store_imports_sys checks 'import sys' exists) or file-source text scans (TestCheckpointPrintsToStderr) which don't test user behavior. Add tests/test_checkpoint_e2e.py with two real e2e scenarios: - Drive agent.run with a mocked LLM that emits a Write tool_call; assert the checkpoint hook created a pre-edit backup of the original content. - Same path but the file exceeds _MAX_FILE_SIZE -- assert the skip message lands on stderr only, not stdout. This is the actual user-visible contract of PR #47 and covers the full wiring agent.run -> Write hook -> checkpoint.store.track_file_edit. The three behavior tests in test_checkpoint_store.py stay -- they cover the store function directly via capsys. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_checkpoint_e2e.py | 112 ++++++++++++++++++++++++++++++++ tests/test_checkpoint_extras.py | 62 ------------------ 2 files changed, 112 insertions(+), 62 deletions(-) create mode 100644 tests/test_checkpoint_e2e.py delete mode 100644 tests/test_checkpoint_extras.py diff --git a/tests/test_checkpoint_e2e.py b/tests/test_checkpoint_e2e.py new file mode 100644 index 0000000..09ff441 --- /dev/null +++ b/tests/test_checkpoint_e2e.py @@ -0,0 +1,112 @@ +"""End-to-end: drive a real agent.run() conversation where the LLM calls Write, +and verify the checkpoint hook intercepts the call and files a backup to disk. + +Only the LLM provider is mocked (via monkeypatching agent.stream). The Write +tool, checkpoint hooks and checkpoint store all run for real against tmp_path. +""" +from __future__ import annotations + +import pytest + +import tools as _tools_init # noqa: F401 - force built-in tool registration +from agent import AgentState, run +from providers import AssistantTurn +from checkpoint import hooks as checkpoint_hooks +from checkpoint import store as checkpoint_store + + +def _scripted_stream(turns): + cursor = iter(turns) + + def fake_stream(**_kwargs): + spec = next(cursor) + yield AssistantTurn( + text=spec.get("text", ""), + tool_calls=spec.get("tool_calls") or [], + in_tokens=1, out_tokens=1, + ) + + return fake_stream + + +@pytest.fixture +def sandboxed_checkpoints(tmp_path, monkeypatch): + """Run checkpoint store against tmp_path and install hooks on built-in tools.""" + monkeypatch.setattr( + checkpoint_store, "_checkpoints_root", lambda: tmp_path / ".checkpoints" + ) + checkpoint_store.reset_file_versions() + checkpoint_hooks.set_session("e2e-session") + checkpoint_hooks.reset_tracked() + checkpoint_hooks.install_hooks() + yield tmp_path + checkpoint_hooks.reset_tracked() + + +def test_llm_write_triggers_checkpoint_backup(monkeypatch, sandboxed_checkpoints): + """When the LLM calls Write, the checkpoint hook must back the pre-edit file up. + + Pre-populate a small file, then let the LLM overwrite it via the Write + tool. The hook should copy the old content into checkpoints/.../backups/ + before the Write executes, so the backup holds the original bytes. + """ + target = sandboxed_checkpoints / "hello.py" + target.write_text("print('before')\n", encoding="utf-8") + + turns = [ + {"tool_calls": [{ + "id": "w1", + "name": "Write", + "input": {"file_path": str(target), "content": "print('after')\n"}, + }]}, + {"text": "done"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + config = {"model": "test", "permission_mode": "accept-all", + "_session_id": "e2e-session"} + list(run("overwrite the file", state, config, "system prompt")) + + # After the turn: Write applied the new content + assert target.read_text(encoding="utf-8") == "print('after')\n" + + # And the checkpoint hook filed a backup with the pre-edit content + backups_dir = sandboxed_checkpoints / ".checkpoints" / "e2e-session" / "backups" + backups = list(backups_dir.iterdir()) + assert backups, "checkpoint hook did not create a backup file" + assert any(b.read_text(encoding="utf-8") == "print('before')\n" for b in backups) + + +def test_oversized_write_logs_to_stderr_not_stdout( + monkeypatch, sandboxed_checkpoints, capfd +): + """Over the _MAX_FILE_SIZE threshold the hook skips + logs — to stderr only. + + This is the actual user-visible contract of PR #47: checkpoint skips must + not pollute stdout (which carries the conversation transcript), they must + land on stderr where operators look. + """ + monkeypatch.setattr(checkpoint_store, "_MAX_FILE_SIZE", 20) + big = sandboxed_checkpoints / "big.py" + big.write_text("x" * 100, encoding="utf-8") + + turns = [ + {"tool_calls": [{ + "id": "w1", + "name": "Write", + "input": {"file_path": str(big), "content": "y" * 100}, + }]}, + {"text": "ok"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + list(run("rewrite", state, {"model": "test", "permission_mode": "accept-all", + "_session_id": "e2e-session", + "disabled_tools": ["Agent"]}, + "sys")) + + out, errtxt = capfd.readouterr() + assert "[checkpoint] skipping large file" in errtxt + assert "[checkpoint] skipping large file" not in out diff --git a/tests/test_checkpoint_extras.py b/tests/test_checkpoint_extras.py deleted file mode 100644 index 547b096..0000000 --- a/tests/test_checkpoint_extras.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Tests for checkpoint stderr output and extended token snapshot fields.""" -from __future__ import annotations - -from pathlib import Path - - -STORE_PY = Path(__file__).resolve().parent.parent / "checkpoint" / "store.py" - - -def test_store_imports_sys(): - """store.py must import sys for stderr output.""" - import checkpoint.store as mod - - assert hasattr(mod, "sys"), "checkpoint.store should import sys" - - -class TestCheckpointPrintsToStderr: - """All [checkpoint] print() calls must use file=sys.stderr.""" - - def test_all_checkpoint_prints_use_stderr(self): - source = STORE_PY.read_text(encoding="utf-8") - lines = source.split("\n") - violations = [] - i = 0 - while i < len(lines): - if "print(" in lines[i] and "[checkpoint]" in lines[i]: - depth = 0 - statement_lines = [] - j = i - while j < len(lines): - statement_lines.append(lines[j]) - depth += lines[j].count("(") - lines[j].count(")") - if depth == 0: - break - j += 1 - statement = "\n".join(statement_lines) - if "file=sys.stderr" not in statement: - violations.append(f"Line {i + 1}: {lines[i].strip()}") - i = j + 1 - else: - i += 1 - assert not violations, ( - "print() with [checkpoint] missing file=sys.stderr:\n" - + "\n".join(violations) - ) - - -class TestTokenSnapshotExtendedFields: - """token_snapshot dict must include cache_read, cache_creation, distinct_base.""" - - def test_cache_read_in_source(self): - source = STORE_PY.read_text(encoding="utf-8") - assert '"cache_read"' in source, "Missing cache_read field in token_snapshot" - - def test_cache_creation_in_source(self): - source = STORE_PY.read_text(encoding="utf-8") - assert '"cache_creation"' in source, "Missing cache_creation field" - - def test_cache_fields_in_source(self): - source = STORE_PY.read_text(encoding="utf-8") - assert '"cache_read"' in source, "Missing cache_read field" - assert '"cache_creation"' in source, "Missing cache_creation field" From 222b7e2e0f377b5e85bd5070f9f491188e1ffeec Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Tue, 21 Apr 2026 14:46:16 +0200 Subject: [PATCH 5/6] test: add shared conftest.py with scripted_stream and _no_quota fixtures --- tests/conftest.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..350d4c0 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,40 @@ +"""Shared test fixtures for agent-loop e2e tests.""" + +from __future__ import annotations + +import pytest + +from agent import AssistantTurn + + +# --------------- quota stub (avoids ImportError on CI for calc_cost) -------- + +@pytest.fixture(autouse=True) +def _no_quota(monkeypatch): + """Disable quota.record_usage so tests never hit the real billing path.""" + import quota + monkeypatch.setattr(quota, "record_usage", lambda *a, **kw: None) + + +# --------------- scripted LLM stream helper -------------------------------- + +def scripted_stream(captured_schemas: list, turns: list[dict]): + """Return a fake ``stream()`` callable that yields pre-defined turns. + + *captured_schemas* receives the ``tool_schemas`` kwarg from each call, + letting tests assert on schema injection. *turns* is a list of dicts, + each with optional ``text`` and ``tool_calls`` keys. + """ + cursor = iter(turns) + + def fake_stream(**kwargs): + captured_schemas.append(kwargs.get("tool_schemas") or []) + spec = next(cursor) + yield AssistantTurn( + text=spec.get("text", ""), + tool_calls=spec.get("tool_calls") or [], + in_tokens=1, + out_tokens=1, + ) + + return fake_stream From 7bdf2ccb946a54ae51c73b9c552c5a6fe4897b88 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Tue, 21 Apr 2026 14:52:49 +0200 Subject: [PATCH 6/6] refactor: move scripted_stream to tests/helpers.py (importable on all Python versions) --- tests/conftest.py | 28 +--------------------------- tests/helpers.py | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 27 deletions(-) create mode 100644 tests/helpers.py diff --git a/tests/conftest.py b/tests/conftest.py index 350d4c0..f935fc6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,9 @@ -"""Shared test fixtures for agent-loop e2e tests.""" +"""Shared pytest fixtures for all tests.""" from __future__ import annotations import pytest -from agent import AssistantTurn - # --------------- quota stub (avoids ImportError on CI for calc_cost) -------- @@ -14,27 +12,3 @@ def _no_quota(monkeypatch): """Disable quota.record_usage so tests never hit the real billing path.""" import quota monkeypatch.setattr(quota, "record_usage", lambda *a, **kw: None) - - -# --------------- scripted LLM stream helper -------------------------------- - -def scripted_stream(captured_schemas: list, turns: list[dict]): - """Return a fake ``stream()`` callable that yields pre-defined turns. - - *captured_schemas* receives the ``tool_schemas`` kwarg from each call, - letting tests assert on schema injection. *turns* is a list of dicts, - each with optional ``text`` and ``tool_calls`` keys. - """ - cursor = iter(turns) - - def fake_stream(**kwargs): - captured_schemas.append(kwargs.get("tool_schemas") or []) - spec = next(cursor) - yield AssistantTurn( - text=spec.get("text", ""), - tool_calls=spec.get("tool_calls") or [], - in_tokens=1, - out_tokens=1, - ) - - return fake_stream diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..7d37a79 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,27 @@ +"""Reusable test helpers (importable from any test module).""" + +from __future__ import annotations + +from agent import AssistantTurn + + +def scripted_stream(captured_schemas: list, turns: list[dict]): + """Return a fake ``stream()`` callable that yields pre-defined turns. + + *captured_schemas* receives the ``tool_schemas`` kwarg from each call, + letting tests assert on schema injection. *turns* is a list of dicts, + each with optional ``text`` and ``tool_calls`` keys. + """ + cursor = iter(turns) + + def fake_stream(**kwargs): + captured_schemas.append(kwargs.get("tool_schemas") or []) + spec = next(cursor) + yield AssistantTurn( + text=spec.get("text", ""), + tool_calls=spec.get("tool_calls") or [], + in_tokens=1, + out_tokens=1, + ) + + return fake_stream