diff --git a/src/capabilities/slack.md b/src/capabilities/slack.md index 31e83f2..7cb1623 100644 --- a/src/capabilities/slack.md +++ b/src/capabilities/slack.md @@ -51,21 +51,51 @@ When Sam isn't sure of an exact API shape, Sam reads docs.slack.dev. When Sam fi Whenever a reply will take more than ~2 seconds — i.e. *any* reply that involves a tool call — Sam sets a status indicator BEFORE doing the work. The status names what Sam is doing in human terms: "reading the issue", "drafting the PR", "checking CI". Not "thinking…". -API: `POST https://slack.com/api/assistant.threads.setStatus` with `{channel_id, thread_ts, status}`. Clear it (`status=""`) right before posting the final reply, not after. +**Use the `set_status` tool**, not bash-curl. The tool calls `https://slack.com/api/assistant.threads.setStatus` with the active channel + thread_ts already bound — Sam just passes the text. The bash-curl pattern is deprecated; use the tool so the call shows up cleanly in the audit log under `tool: "set_status"` instead of `tool: "bash"` with a curl args string. + +**`respond()` auto-clears the status.** Sam SETS status mid-session; `respond()` CLEARS it on exit. Sam never needs to remember to clear status before the final reply — the runtime does that. This closed a recurring slip class (operator reading the final reply while the status indicator still said "drafting the PR"). See the 2026-05-13 journal entries for the original failure mode. This is unconditional. Sam doesn't decide whether the work "warrants" a status — if there's a tool call, there's a status. (The skill `src/skills/slack-dynamic-messaging.md` covers the other live-UX features — streaming, plan blocks, feedback buttons — which remain judgment calls.) +## Cancel — `:no_entry:` reaction + +**Anyone in the channel** can cancel an in-flight session by adding the `:no_entry:` reaction to the original message that triggered Sam. The daemon's `reaction_added` handler matches on (any human user + :no_entry: + the live lifecycle target) and cancels the running session task. The bot's own `:no_entry:` stamps (which the cleanup adds as the terminal lifecycle reaction) are filtered out explicitly. Cleanup is automatic: lifecycle stamps `:no_entry:` as the terminal reaction, the daemon posts a brief "cancelled" note in the thread, and the journal entry records `status: cancelled` with the last_failure_signature if any. + +This is a deliberately broad affordance — Sam works in a shared channel; anyone seeing it head down a wrong path should be able to stop it without needing the principal operator. The blast radius is bounded (one session, terminal state, queue continues normally). + +Sam doesn't need to do anything for this — the cancel is event-driven via the existing Slack reaction subscription, no polling. But Sam reading the journal later should recognize `status: cancelled` as distinct from `errored` / `timed_out` / `stuck`: it means a teammate stopped the work deliberately. Don't auto-retry a cancelled session. + ## First reply on a tool-using task — restate, name approach Any session that'll involve real work (multiple tool calls, file edits, opening a PR, downloading attachments) gets a first substantive message in the thread within ~5 seconds of the trigger, *before* Sam starts the long work. +**Use the `ack` tool, not bash-curl.** `ack(text)` is a dedicated tool that posts a mid-session progress note without closing the silent-exit gate. It's the right shape for: the first reply (restate + approach + commit), intermediate progress updates ("found 3 vulns, drafting patch"), and mid-task blockers Sam needs to flag without finishing the session. Multiple calls per session are expected. The audit log records each as `tool: "ack"`, so the shape of a session's communication is greppable. + Three parts, terse: 1. **Restate the ask** in Sam's own words. Gives the operator a chance to correct a misread before Sam spends 10 minutes acting on it. 2. **Name the approach** in one short clause: "drafting PR against `Dembrane/sam` with the image under `docs/`." 3. **Brief follow-up commit**: "back in a few minutes." -Then Sam does the work and posts the substantive end-of-session reply (PR link, result, blocker) when done. +Then Sam does the work and posts the substantive end-of-session reply (PR link, result, blocker) via `respond()` when done. + +## Tool-by-tool — `ack` vs `respond` vs `set_status` + +| Tool | What it does | Closes silent-exit gate? | Multiple calls? | +|---|---|---|---| +| `ack(text)` | Posts a thread message (ack, mid-progress, intermediate finding) | **No** | Yes — expected | +| `respond(text)` | Posts the final close-the-loop reply; auto-clears Slack status | **Yes** | Once per session | +| `set_status(text)` | Updates the assistant-thread status indicator (UI chip) | No | As needed | + +Canonical Slack-task shape: + +1. `set_status("reading the issue")` — indicator +2. `ack("got it — drafting PR against Dembrane/echo for the security audit, back in 5")` — restate + approach +3. tool work (clone, edit, push, gh pr create) +4. `set_status("opening PR")` — indicator update (optional) +5. `respond("PR up — pip-audit + trivy + gitleaks workflow in echo. ...")` — final reply, auto-clears status + +The silent-exit gate only counts `respond()` (or a bash chat.postMessage AFTER the last outward call as fallback). N acks followed by no `respond()` will trip the gate — that's the contract. This is *not* preamble. "Sure!", "Got it!", "Working on it!" are still banned (see Voice). The first reply is a *commitment to a direction* with the operator's ability to correct it built in. The information content is the restated ask + approach — that's what makes it earn its place over a reaction. diff --git a/src/capabilities/tool-timeouts.md b/src/capabilities/tool-timeouts.md new file mode 100644 index 0000000..63b242b --- /dev/null +++ b/src/capabilities/tool-timeouts.md @@ -0,0 +1,71 @@ +# Capability: Tool timeouts + +Tool calls have wall-clock budgets. When the budget runs out, the daemon kills the process and the tool returns a structured `TIMEOUT after Ns` payload. A timeout is *not* the same as a command-level failure, and Sam's response to one should differ. + +## Budgets + +| Tool | Timeout | +|---|---| +| `bash` | **300s** (5 min — needs longer because it runs networked commands: `git clone`, `pip install`, `gh api`, etc.) | +| `fetch_url`, `grep` | 30s | +| All other tools | 120s | + +These budgets are enforced in `src/runtime/adk_runner.py`. They're hard ceilings — the daemon SIGKILLs the underlying process and returns a structured payload. The process does *not* finish in the background; whatever it was doing is interrupted. + +## How a timeout looks to Sam + +The bash tool returns a multi-line string with this exact shape: + +``` +TIMEOUT after 300s (wall-clock kill, process killed). +Command: +Hint: +``` + +Other tools may return shorter timeout strings, but every timeout output starts with `TIMEOUT after Ns` so audit-log scans and reflexive pattern-matching catch them cleanly. + +## What a timeout means (and doesn't mean) + +A timeout means **the wall clock ran out before the process exited.** That's it. It does *not* tell Sam: + +- Whether the syntax was correct +- Whether the tool exists +- Whether the network was reachable +- Whether the process was making progress or wedged + +The empty information about the cause is the load-bearing fact. **A timeout almost never means "the syntax was wrong"** — syntax errors fail fast, in milliseconds. A timeout that took the full budget to fire is overwhelmingly an environment problem: missing tool, blocked network egress, slow remote, etc. + +## What Sam does after a timeout + +In this order: + +1. **Stop. Do not retry the same shape.** If the same command failed at 300s, running it again will fail at 300s again. The 2026-05-25 pip-audit incident burned ~10 minutes on five identical retries because the model treated each timeout as a syntax fix-it-and-go. +2. **Diagnose the environment.** A handful of cheap probes name the cause: + - `which ` — is the binary installed? + - `ping -c 1 -W 2 ` — is the remote reachable from inside the container? + - `cat /etc/resolv.conf` — does DNS resolve? + - `pip config list` or equivalent — is the package index reachable / configured? + - `gcloud config list project` — is auth still valid? +3. **Decide:** + - **Environment broken, work doesn't belong here.** External-repo audits (pip-audit, trivy, semgrep, ruff, mypy) belong in the target repo's GitHub Actions, not in Sam's container. Open a PR adding `.github/workflows/...yml` to the target repo. See `src/skills/external-repo-audits/` if it exists; otherwise the rule still applies. + - **Environment broken, no alternative path.** Post the failure via `respond()` naming the constraint Sam hit, and exit. Don't loop. + - **Genuine transient (rare).** One retry is fine. A second timeout = environment, not transient. + - **Ambiguous.** `ask_operator` with the timeout output and the diagnostic findings. The operator decides. + +## What Sam does NOT do after a timeout + +- Retry the same command with slightly different syntax (`pip install X` → `pip3 install X` → `python -m pip install X`). All three time out the same way if the env is broken. +- Quietly continue without acknowledging the timeout. The audit log records every TIMEOUT; future-Sam reading the journal expects to find a corresponding self-aware acknowledgement in the session entry. +- Burn the rest of the session budget on environment retries. `MAX_SESSION_SECONDS` is 1 hour; bash timeouts at 300s each compound fast. + +## How timeouts show up in cross-session state + +When a session is killed (revision rollover, SIGKILL, session-budget timeout) before it can write its own journal entry, `_safety_net_journal_entry` (in `src/runtime/session.py`) reads the audit log for that session, extracts any repeat-timeout signature, and writes it into the stub journal entry under `last_failure_signature:`. + +On recovery, `_format_recovered_preamble` reads that signature from the prior session's journal entry and prepends it to the new session's preamble. Sam recovers with context: *"the previous attempt died in a retry loop on `pip install pip-audit` (5x same failure). Don't retry that path — diagnose env or move the work."* + +This means **a session that times out leaves a trail.** Future-Sam picks up the failure mode without having to re-discover it. Don't paper over the prior failure; acknowledge it and pick a different path. + +## Why this capability exists + +The 2026-05-25 audit-task incident: a session spent ~10 minutes retrying `git clone` / `pip install` (5x bash timeouts) before pivoting to writing the audit workflow in the target repo's CI. The pivot was the right answer; the 10 minutes of retry-loop waste was the avoidable cost. Naming the rule here closes the class — and the structured TIMEOUT payload + recovery preamble make sure the rule is enforceable across sessions, not just within one. diff --git a/src/runtime/adk_runner.py b/src/runtime/adk_runner.py index 3c0f570..957c4a4 100644 --- a/src/runtime/adk_runner.py +++ b/src/runtime/adk_runner.py @@ -77,9 +77,27 @@ # _classify_tool_use in session.py matches against these exact strings. +#: bash gets a longer budget than the rest of the tools because it runs +# networked commands (git clone, pip install, gh api, gcloud) that +# legitimately need more than the typed-tool default. The structured +# TIMEOUT payload below tells Sam this was a wall-clock kill so it +# doesn't read a missing-network error as a syntax problem and retry +# the same shape. See src/capabilities/tool-timeouts.md. +BASH_TIMEOUT_SECONDS = 300 +DEFAULT_TOOL_TIMEOUT_SECONDS = 120 + + def _make_bash(cwd: Optional[str], env: Optional[dict[str, str]]): async def bash(command: str) -> str: - """Run a shell command and return combined stdout+stderr. Timeout 120s.""" + """Run a shell command and return combined stdout+stderr. + + Timeout 300s. On timeout, returns a structured TIMEOUT payload + starting with `TIMEOUT after 300s` so Sam can distinguish a + wall-clock kill from a command-level error. See + src/capabilities/tool-timeouts.md for what to do next (diagnose + env, change approach, ask_operator, or fail loudly — never + retry the same shape). + """ try: proc = await asyncio.create_subprocess_shell( command, @@ -90,11 +108,25 @@ async def bash(command: str) -> str: ) try: stdout, stderr = await asyncio.wait_for( - proc.communicate(), timeout=120 + proc.communicate(), timeout=BASH_TIMEOUT_SECONDS ) except asyncio.TimeoutError: proc.kill() - return "(command timed out after 120s)" + # Structured payload — the `TIMEOUT after Ns` prefix is the + # load-bearing contract for audit.py's error_type inference + # and for daily-maintenance's timeout scan. Don't drop it. + return ( + f"TIMEOUT after {BASH_TIMEOUT_SECONDS}s (wall-clock kill, " + f"process killed).\n" + f"Command: {command}\n" + "Hint: this is environment, not syntax. The wall clock ran " + "out; the process did not exit on its own. Do not retry " + "the same shape. Diagnose: `which `, `ping -c 1 -W 2 " + "`, `cat /etc/resolv.conf`, check auth. If the env " + "is broken, either move the work to target-repo CI, " + "`ask_operator`, or post the failure via `respond()` and " + "exit. See src/capabilities/tool-timeouts.md." + ) out = stdout.decode(errors="replace") err = stderr.decode(errors="replace") if err: @@ -991,6 +1023,28 @@ async def respond(text: str) -> str: try: async with aiohttp.ClientSession() as http: + # Auto-clear the assistant_threads setStatus indicator BEFORE + # the post lands — keeps "Sam is thinking…" from lingering + # next to the final reply. Best-effort: a failure here must + # not block the actual post. This is the standardization + # rule paired with `set_status` (the dedicated tool): the + # model SETS status mid-session, `respond()` CLEARS it on + # exit. The model never needs to remember to clear. + if target_thread_ts: + try: + async with http.post( + "https://slack.com/api/assistant.threads.setStatus", + headers=headers, + json={ + "channel_id": channel, + "thread_ts": target_thread_ts, + "status": "", + }, + ) as _clear_resp: + await _clear_resp.read() # drain; status is best-effort + except Exception: + log.debug("respond: status auto-clear failed (continuing)") + async with http.post( "https://slack.com/api/chat.postMessage", headers=headers, @@ -1009,6 +1063,194 @@ async def respond(text: str) -> str: return respond +def _make_ack_tool( + *, + channel: Optional[str], + thread_ts: Optional[str], + event_ts: Optional[str], +): + """Return a coroutine that posts a mid-session progress note WITHOUT + closing the silent-exit gate. Multiple calls per session allowed. + + `ack()` is the dedicated tool for the "first reply" pattern + documented in `src/capabilities/slack.md` (restate the ask, name + the approach, commit to a follow-up) AND for any mid-session + progress update (intermediate findings, blocked-on-X notes, + "going to take 5 more min" timing updates). + + ## Distinct from respond() + + - `ack` is **not terminal**. The silent-exit gate (`closed_loop`) + ignores it. Sam must still call `respond()` at the end to + satisfy the contract. + - Multiple `ack` calls per session are expected and audited as + separate entries. The shape (one ACK + N progress notes + one + respond) is the canonical Slack-task flow. + - `respond()` auto-clears the Slack status indicator; `ack()` + does **not**, because the work continues. + + ## Distinct from set_status() + + - `set_status()` updates Slack's assistant-thread status indicator + (the small UI element next to the active thread). + - `ack()` posts an actual message in the thread the operator + reads. Different surfaces — both can be active at once. + + ## Why a dedicated tool + + Before `ack`, the "first reply" was a bash-curl to + `chat.postMessage`. The audit log saw `tool: "bash"` with a curl + args string, indistinguishable from any other shell command. + Daily-maintenance and future-Sam couldn't grep for "where did + Sam ACK before work started?" without parsing bash args. Now the + call surface is clean: `tool: "ack"` in the audit log. + + The silent-exit gate also benefits: the previous bash-curl path + relied on string-matching `"chat.postMessage" in command`, which + had holes (heredocs, helper scripts, etc.). Routing through `ack` + means the classifier sees a discrete `ack` tool call and treats + it as inward without inspecting bash args. + + Channel/thread_ts come from the IncomingMessage and are bound at + closure-time so the LLM signature is `ack(text: str)`. + """ + async def ack(text: str) -> str: + """Post a mid-session progress note. Does NOT close the loop. + + Audited as `tool='ack'`; multiple calls per session allowed; + does not satisfy the silent-exit gate. Use for: the + first-reply pattern (restate + approach + brief commit), + mid-work progress updates, mid-task blockers Sam needs to + flag without finishing the session. + + text: the message body in Slack mrkdwn. Same formatting + constraints as `respond` — `*bold*`, `_italic_`, + `` `code` ``, `- ` for flat bullets, `` for + links. Light mrkdwn cleanup applied (`### Heading` + becomes `*Heading*`, standalone `---` becomes blank). + + Returns a sentinel string the LLM can ignore. The session + continues; remember to call `respond()` at the end. + """ + import aiohttp + from .config import SLACK_BOT_TOKEN + + if not channel: + return ( + "(ack unavailable: no originating channel — ack is for " + "Slack-triggered sessions only. Scheduled/cron sessions " + "don't have a thread to post into.)" + ) + if not SLACK_BOT_TOKEN: + return "(ack unavailable: SLACK_BOT_TOKEN not set.)" + + cleaned, warnings = _clean_for_slack_mrkdwn(text) + for w in warnings: + log.warning("ack: voice warning — %s", w) + + target_thread_ts = thread_ts or event_ts + post_body = {"channel": channel, "text": cleaned} + if target_thread_ts: + post_body["thread_ts"] = target_thread_ts + headers = { + "Authorization": f"Bearer {SLACK_BOT_TOKEN}", + "Content-Type": "application/json; charset=utf-8", + } + + try: + async with aiohttp.ClientSession() as http: + async with http.post( + "https://slack.com/api/chat.postMessage", + headers=headers, + json=post_body, + ) as resp: + post_json = await resp.json() + if not post_json.get("ok"): + err = post_json.get("error", "unknown") + return f"(ack failed at chat.postMessage: {err}.)" + except Exception as exc: + log.exception("ack HTTP call failed") + return f"(ack HTTP error: {type(exc).__name__}: {exc}.)" + + return ( + f"Posted ack to {channel} (ts={post_json.get('ts')}). " + "Session continues — remember to call `respond()` at the end " + "to close the loop." + ) + + return ack + + +def _make_set_status_tool( + *, + channel: Optional[str], + thread_ts: Optional[str], + event_ts: Optional[str], +): + """Return a coroutine that sets the assistant_threads status indicator + for the originating Slack thread. + + Replaces the bash-curl pattern for setStatus (see src/capabilities/slack.md). + Standardized partner to `respond()` — the model SETS status here + mid-session ("reading the issue", "drafting the PR"), and `respond()` + auto-clears on exit. The model never needs to remember to clear. + + Channel/thread_ts come from the IncomingMessage and are bound at + closure-time so the LLM signature is `set_status(text: str)`. + """ + async def set_status(text: str) -> str: + """Update the Slack assistant-thread status indicator. + + Slack prepends the bot's display name to the status string, so the + text starts with the verb in lowercase: "reading the issue", + "drafting the PR", "checking CI". Not "thinking…" — that's the + daemon's default fallback. Slack renders the status next to the + active assistant thread; it disappears when respond() fires. + + text: short status (under ~50 chars), verb-led, lowercase. + Examples: "reading the linear issue", "running pip-audit + against echo/server", "drafting PR description". + + Returns a sentinel string the LLM can ignore. + """ + import aiohttp + from .config import SLACK_BOT_TOKEN + + target_thread_ts = thread_ts or event_ts + if not channel or not target_thread_ts: + return ( + "(set_status unavailable: no originating Slack thread. " + "Scheduled/cron sessions don't have a status indicator.)" + ) + if not SLACK_BOT_TOKEN: + return "(set_status unavailable: SLACK_BOT_TOKEN not set.)" + + headers = { + "Authorization": f"Bearer {SLACK_BOT_TOKEN}", + "Content-Type": "application/json; charset=utf-8", + } + try: + async with aiohttp.ClientSession() as http: + async with http.post( + "https://slack.com/api/assistant.threads.setStatus", + headers=headers, + json={ + "channel_id": channel, + "thread_ts": target_thread_ts, + "status": text, + }, + ) as resp: + body = await resp.json() + if not body.get("ok"): + return f"(set_status failed: {body.get('error', 'unknown')})" + except Exception as exc: + log.exception("set_status HTTP call failed") + return f"(set_status HTTP error: {type(exc).__name__}: {exc})" + return f"status set: {text!r}" + + return set_status + + # ─── ADK runner ─────────────────────────────────────────────────────────────── @@ -1140,6 +1382,31 @@ async def run(self, request: AgentRunRequest) -> AgentRunResult: event_ts=request.slack_event_ts, ) + # `ack` — dedicated tool for mid-session progress notes that do NOT + # close the silent-exit gate. Replaces the bash-curl-to-Slack pattern + # Sam previously used for the first-reply restate + approach + brief + # commit. Multiple calls per session are expected. The classifier in + # session.py treats `ack` as inward — it doesn't satisfy + # `closed_loop` and doesn't count as outward work. Only the main + # agent gets this; workers don't post to Slack. + ack_tool = _make_ack_tool( + channel=request.slack_channel, + thread_ts=request.slack_thread_ts, + event_ts=request.slack_event_ts, + ) + + # `set_status` — dedicated tool for assistant_threads_setStatus, + # standardizing the bash-curl pattern Sam used previously. Paired + # with `respond()` which auto-clears the status on exit. Only the + # main agent gets this; workers/pro_executor/mentor have no Slack + # surface. See src/capabilities/slack.md for the SET-mid-session / + # CLEAR-on-respond contract. + set_status_tool = _make_set_status_tool( + channel=request.slack_channel, + thread_ts=request.slack_thread_ts, + event_ts=request.slack_event_ts, + ) + # `consult_opus` — dispatch the mentor (Opus, read-only) to review # accumulated context. Sam DOES NOT decide to invoke autonomously — # only the `daily-maintenance` skill (review step) and the @@ -1167,6 +1434,8 @@ async def run(self, request: AgentRunRequest) -> AgentRunResult: FunctionTool(func=consult_opus_tool), FunctionTool(func=ask_operator_tool), FunctionTool(func=respond_tool), + FunctionTool(func=ack_tool), + FunctionTool(func=set_status_tool), ], ) diff --git a/src/runtime/audit.py b/src/runtime/audit.py index b311fc3..5c21ba4 100644 --- a/src/runtime/audit.py +++ b/src/runtime/audit.py @@ -1,4 +1,4 @@ -"""Per-tool-call audit log writer. +"""Per-tool-call audit log writer + reader. Writes one NDJSON line per tool invocation to /data/tool_calls/.jsonl. Designed to be called from inside the ADK event loop in adk_runner.py. @@ -10,12 +10,16 @@ against accidental token leakage. * Truncation: outputs > MAX_OUTPUT_BYTES are stored truncated with a marker flag, so the on-disk file size stays bounded under tool-output bombs. +* Read-back: `read_recent_tool_calls` and `detect_last_failure_signature` + let session-recovery code pull a structured view of the prior session's + tail so the recovery preamble can carry forward the actual failure mode + (see src/capabilities/tool-timeouts.md). """ from __future__ import annotations import json -from datetime import datetime, timezone -from typing import Any +from datetime import datetime, timedelta, timezone +from typing import Any, Optional from .config import ( log, @@ -45,6 +49,33 @@ def _redact_value(v: Any) -> Any: return v +# Recognized error_type values surfaced from the tool's output prefix when the +# caller doesn't pass error_type explicitly. Match against the leading bytes +# of the tool output. Keep this list short — it's a load-bearing contract +# between tools that produce these prefixes and daily-maintenance which +# scans the audit log for them. +_OUTPUT_PREFIX_TO_ERROR_TYPE = ( + ("TIMEOUT after ", "timeout"), + ("error: ", "error"), +) + + +def _infer_error_type(output: str) -> Optional[str]: + """Best-effort error_type inference from tool output prefix. + + Tools that return a recognized failure prefix get tagged automatically + so daily-maintenance can grep on error_type without needing every tool + to thread the field through. + """ + if not output: + return None + head = output[:32] + for prefix, etype in _OUTPUT_PREFIX_TO_ERROR_TYPE: + if head.startswith(prefix): + return etype + return None + + def record_tool_call( *, session_id: str, @@ -54,9 +85,15 @@ def record_tool_call( output: str, started_at: float, duration_ms: int, + error_type: Optional[str] = None, ) -> None: """Append one NDJSON line to today's tool_calls log. + error_type: optional explicit failure tag ("timeout", "error", etc.). + When omitted, falls back to a prefix-match on `output` so existing + callers don't need to change. None means "tool succeeded as far as + the audit layer can tell." + Never raises; logs and swallows errors so audit failures don't kill the parent session. """ @@ -71,6 +108,8 @@ def record_tool_call( ) + _TRUNC_MARKER truncated = True + resolved_error_type = error_type or _infer_error_type(output) + record = { "ts": datetime.now(tz=timezone.utc).isoformat(), "session_id": session_id, @@ -83,9 +122,118 @@ def record_tool_call( "started_at": started_at, "duration_ms": duration_ms, } + if resolved_error_type is not None: + record["error_type"] = resolved_error_type TOOL_CALLS_DIR.mkdir(parents=True, exist_ok=True) log_path = tool_calls_path_for(datetime.now(tz=timezone.utc).date()) with log_path.open("a", encoding="utf-8") as fh: fh.write(json.dumps(record, ensure_ascii=False) + "\n") except Exception: log.exception("audit.record_tool_call failed (swallowing)") + + +def read_recent_tool_calls( + session_id: str, + *, + limit: int = 8, + lookback_days: int = 2, +) -> list[dict]: + """Read the most recent audit-log entries for a given session_id. + + Walks at most `lookback_days` of audit files (today, then yesterday) + in reverse-chronological order, parses each line, filters by + session_id, and returns the tail bounded by `limit`. Never raises. + Returns [] when no matching entries exist. + + Used by session.py for last_failure_signature detection — see + `detect_last_failure_signature` below and the recovery preamble in + session.py. + """ + out: list[dict] = [] + today = datetime.now(tz=timezone.utc).date() + for delta in range(lookback_days): + path = tool_calls_path_for(today - timedelta(days=delta)) + if not path.exists(): + continue + try: + with path.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + except Exception: + continue + if rec.get("session_id") == session_id: + out.append(rec) + except Exception: + log.exception("audit.read_recent_tool_calls failed for %s", path) + continue + return out[-limit:] if limit and len(out) > limit else out + + +def detect_last_failure_signature(session_id: str) -> Optional[dict]: + """Return a compact signature of the prior session's terminal failure mode. + + Looks at the tail of the session's tool calls (via read_recent_tool_calls) + and returns a structured signature when at least two of the last five + calls share the same error_type AND tool. This is the "repeat failure" + shape — the failure mode worth carrying into the recovery preamble. + + Returns None when: + - No audit entries exist for the session_id + - The tail has fewer than two failures + - No two-of-five tail calls share the same (tool, error_type) + + Shape on hit: + { + "tool": "bash", + "error_type": "timeout", + "repeat_count": 5, + "command": "pip install pip-audit ...", # may be truncated + "last_error_prefix": "TIMEOUT after 300s ...", # first ~200 chars + } + """ + records = read_recent_tool_calls(session_id, limit=8) + if len(records) < 2: + return None + tail = records[-5:] + # Group the tail by (tool, error_type) where error_type is set. + grouped: dict[tuple[str, str], list[dict]] = {} + for rec in tail: + et = rec.get("error_type") + if not et: + continue + key = (rec.get("tool") or "", et) + grouped.setdefault(key, []).append(rec) + if not grouped: + return None + # Pick the largest group; require >= 2 to count as a "repeat failure". + (tool, error_type), hits = max( + grouped.items(), key=lambda kv: len(kv[1]) + ) + if len(hits) < 2: + return None + last = hits[-1] + args = last.get("args") or {} + command = "" + # bash uses 'command'; other tools may use other arg names — fall back + # to a flat str(args) for non-bash tools so the signature still + # carries something useful. + if isinstance(args, dict): + command = ( + args.get("command") + or args.get("path") + or args.get("url") + or json.dumps(args, ensure_ascii=False) + ) + command = (command or "")[:200] + output_prefix = (last.get("output") or "")[:200] + return { + "tool": tool, + "error_type": error_type, + "repeat_count": len(hits), + "command": command, + "last_error_prefix": output_prefix, + } diff --git a/src/runtime/daemon.py b/src/runtime/daemon.py index 2327e99..cd378d3 100644 --- a/src/runtime/daemon.py +++ b/src/runtime/daemon.py @@ -56,6 +56,7 @@ SLACK_BOT_TOKEN, THREAD_CACHE_TTL_SECONDS, acquire_lock, + journal_path_for_today, load_cursor, log, provision_subagents, @@ -130,10 +131,18 @@ def _format_session_badges(result: SessionResult) -> str: # ----------------------------------------------------------------------------- # Reactions Sam uses as terminal markers in the lifecycle state machine. -# A message carrying either of these from the bot has been fully processed +# A message carrying any of these from the bot has been fully processed # and must NOT be re-queued on boot. See `_mark_lifecycle` for the writer # side and `_recover_from_reactions` for the reader. -_TERMINAL_REACTIONS = ("white_check_mark", "x") +# +# Members: +# - white_check_mark: success +# - x: both-attempts-failed (operator-alert posted) +# - no_entry: operator-cancel via :no_entry: reaction (see +# `_handle_operator_cancel`). Without this entry, cancelled +# messages would be re-queued by reaction-recovery on next +# boot, defeating the cancel. +_TERMINAL_REACTIONS = ("white_check_mark", "x", "no_entry") # Non-terminal reactions that recovery may need to clear when self-healing # a stale candidate (lifecycle was interrupted between work-completion and @@ -267,6 +276,14 @@ def __init__(self) -> None: self._seen_ts_order: list[str] = [] # In-memory cache: 'channel:thread_ts' -> (bot_has_posted, cached_at) self._thread_cache: dict[str, tuple[bool, float]] = {} + # Currently-running session — set when the worker dispatches a + # SamSession.run() child task, cleared on completion. The + # `reaction_added` handler reads these to honor operator-initiated + # `:no_entry:` cancellation. Event-driven, no polling. See + # src/capabilities/slack.md "Operator-cancel reaction". + self._current_session_task: Optional[asyncio.Task] = None + self._current_session_target: Optional[tuple[str, str]] = None + self._current_session_obj: Optional[SamSession] = None # Channel ids seen via assistant_thread_started; used as a fallback to # detect side-pane messages that don't carry an `assistant_thread` field. self._assistant_thread_channels: set[str] = set() @@ -894,6 +911,126 @@ async def _send_side_pane_redirect(self, channel: str) -> None: except Exception: log.exception("failed to post side-pane redirect to %s", channel) + async def _handle_operator_cancel( + self, + sam_session: "SamSession", + message: IncomingMessage, + ) -> None: + """Cleanup when anyone in the channel cancels a live session + via the `:no_entry:` reaction. + + Terminal lifecycle reaction `:no_entry:` (distinct from `:x:` + which means "both retries failed"), brief Slack note in the + thread, journal stub with status=cancelled, and cursor advance + so boot replay doesn't re-fire the cancelled message. + + Method name kept as `_handle_operator_cancel` for backwards + compatibility with logs / journal greps; the actor isn't always + the principal operator anymore. + """ + # 1. Lifecycle reaction: :hourglass: → :no_entry: + try: + await self._mark_lifecycle( + message, remove="hourglass_flowing_sand", add="no_entry", + ) + except Exception: + log.exception("operator-cancel: failed to mark :no_entry: lifecycle") + # 2. Brief note. Don't elaborate; the operator initiated this. + try: + await self.app.client.chat_postMessage( + channel=message.channel, + thread_ts=message.thread_ts or message.event_ts, + text=redact_secrets( + "cancelled by operator. work-so-far is in the journal " + f"entry for session `{sam_session.session_id[:12]}…`." + ), + ) + except Exception: + log.exception("operator-cancel: failed to post note") + # 3. Journal stub for the cancel — distinct status so daily-maintenance + # and future-Sam reading the journal can tell this apart from a + # timeout/error. Pulls last_failure_signature from the audit log + # if one exists (same path the safety-net writer uses). + try: + from .audit import detect_last_failure_signature + signature = None + try: + signature = detect_last_failure_signature(sam_session.session_id) + except Exception: + log.exception("operator-cancel: signature lookup failed") + sig_yaml = "" + sig_section = "" + if signature: + import json as _json + sig_json = _json.dumps(signature, ensure_ascii=False) + sig_escaped = sig_json.replace("'", "''") + sig_yaml = f"last_failure_signature: '{sig_escaped}'\n" + sig_section = ( + "\n## Last failure signature (pre-cancel)\n\n" + f"- **Tool:** `{signature.get('tool')}`\n" + f"- **Error type:** `{signature.get('error_type')}`\n" + f"- **Repeated:** {signature.get('repeat_count')} consecutive same-shape failures\n" + f"- **Command:** `{signature.get('command')}`\n\n" + ) + thread_ts_line = ( + f"thread_ts: {message.thread_ts}\n" if message.thread_ts else "" + ) + entry = f"""--- +date: {datetime.now().astimezone().isoformat()} +session: {sam_session.session_id} +trigger: slack mention (channel={message.channel}) +{thread_ts_line}status: cancelled +{sig_yaml}--- + +## What happened + +Session cancelled by operator via `:no_entry:` reaction on the original message. +{sig_section} +## Open threads + +If the work was partial, the operator initiated the stop deliberately. Don't auto-retry. + +""" + with journal_path_for_today().open("a") as f: + f.write(entry) + except Exception: + log.exception("operator-cancel: failed to write journal stub") + # 4. Synthetic audit entry — the cancel happened mid-tool, so no + # function_response event fires and `record_tool_call` never runs + # for the in-flight call. Without this entry the audit log shows + # the last completed call followed by silence, and + # `daily-maintenance`'s timeout/repeat-failure scan can't see + # cancels as a class. Tag with `error_type="cancelled"` so the + # scan groups it alongside timeouts/errors. + try: + from .audit import record_tool_call + record_tool_call( + session_id=sam_session.session_id, + agent="_daemon", + tool="_operator_cancel", + args={ + "reaction": "no_entry", + "operator": SAM_OPERATOR_USER_ID or "unknown", + "channel": message.channel, + "event_ts": message.event_ts, + }, + output=( + "Session cancelled by operator via :no_entry: reaction " + "on the original message." + ), + started_at=time.time(), + duration_ms=0, + error_type="cancelled", + ) + except Exception: + log.exception("operator-cancel: synthetic audit entry failed") + # 5. Advance cursor so boot replay doesn't re-queue this. + if not message.scheduled: + try: + save_cursor(message.event_ts) + except Exception: + log.exception("operator-cancel: failed to advance cursor") + def _register_handlers(self) -> None: @self.app.event("app_mention") async def on_app_mention(event, client): @@ -941,6 +1078,49 @@ async def on_assistant_thread_started(event, client): @self.app.event("reaction_added") async def on_reaction_added(event, client): log.info("reaction added: %s on %s", event.get("reaction"), event.get("item")) + # Cancel: `:no_entry:` from ANY human user on the live + # lifecycle message (the original Slack mention) cancels the + # in-flight session. Anyone in the channel can stop Sam — + # this is a team-wide affordance, not an operator-only one. + # Bot's own :no_entry: stamps (added by cleanup) are filtered + # out explicitly. Event-driven via the existing Slack + # reaction subscription — no polling. The cancel is honored + # at the next natural await boundary inside the session's + # task; cleanup posts a brief note and stamps :no_entry: as + # the terminal lifecycle reaction. + if event.get("reaction") != "no_entry": + return + user_id = event.get("user") + if not user_id: + return + if self.bot_user_id and user_id == self.bot_user_id: + # The bot itself stamping :no_entry: during cleanup + # doesn't count as a cancel signal. + return + item = event.get("item") or {} + if item.get("type") != "message": + return + target_ts = item.get("ts") + target_channel = item.get("channel") + if not (target_ts and target_channel): + return + current = self._current_session_target + task = self._current_session_task + if not current or task is None or task.done(): + log.info("operator-cancel :no_entry: but no live session; ignoring") + return + cur_channel, cur_event_ts = current + if cur_channel != target_channel or cur_event_ts != target_ts: + log.info( + "operator-cancel :no_entry: on %s/%s does not match live session %s/%s", + target_channel, target_ts, cur_channel, cur_event_ts, + ) + return + log.info( + "cancel :no_entry: matches live session (reactor=%s) — cancelling task", + user_id, + ) + task.cancel() @self.app.event("reaction_removed") async def on_reaction_removed(event, client): @@ -1074,10 +1254,13 @@ async def _handle_event(self, event: dict, *, recovered: bool = False, force_all def _sam_finished(self, msg: dict) -> bool: """Has Sam reached a terminal reaction state on this message? - Terminal = `:white_check_mark:` (success) or `:x:` (both-attempts- - failed, operator-alert posted). Either means Sam shouldn't re-queue. - Maintained by `_mark_lifecycle` in the worker; piggybacks on Slack's - reaction state so the daemon doesn't need a separate persisted set. + Terminal = any reaction in `_TERMINAL_REACTIONS` from the bot: + `:white_check_mark:` (success), `:x:` (both-attempts-failed, + operator-alert posted), or `:no_entry:` (operator-cancel). Any + means Sam shouldn't re-queue. Maintained by `_mark_lifecycle` + (and `_handle_operator_cancel` for the cancel case); piggybacks + on Slack's reaction state so the daemon doesn't need a separate + persisted set. Reactions are inlined in `conversations.history` and `conversations.replies` responses — no extra API call needed. @@ -1085,7 +1268,7 @@ def _sam_finished(self, msg: dict) -> bool: if not self.bot_user_id: return False for reaction in msg.get("reactions") or []: - if reaction.get("name") not in ("white_check_mark", "x"): + if reaction.get("name") not in _TERMINAL_REACTIONS: continue if self.bot_user_id in (reaction.get("users") or []): return True @@ -1396,11 +1579,40 @@ async def _worker(self) -> None: message.channel, message.thread_ts or message.event_ts, ) first = SamSession(message) + # Run the session as a child task so the reaction_added + # handler can cancel JUST this session without killing the + # worker. The handler matches on (channel, event_ts) which + # is the original Slack message — when the operator adds + # :no_entry: to that message, we cancel here. + self._current_session_obj = first + self._current_session_target = (message.channel, message.event_ts) + self._current_session_task = asyncio.create_task(first.run()) + cancelled = False try: - first_result = await first.run() + first_result = await self._current_session_task + except asyncio.CancelledError: + cancelled = True + first_result = None + log.info( + "session cancelled by operator :no_entry: reaction " + "(channel=%s event_ts=%s)", + message.channel, message.event_ts, + ) except Exception: log.exception("session crashed") first_result = None + finally: + self._current_session_task = None + self._current_session_target = None + self._current_session_obj = None + + if cancelled: + # Operator-initiated cancel: stamp :no_entry: as the + # terminal lifecycle reaction, post a brief note, + # write a journal stub with status=cancelled, advance + # the cursor so the message doesn't re-fire on boot. + await self._handle_operator_cancel(first, message) + continue if first_result is None: # Session crashed before producing a result. Leave diff --git a/src/runtime/session.py b/src/runtime/session.py index a74ba03..aa78c90 100644 --- a/src/runtime/session.py +++ b/src/runtime/session.py @@ -12,12 +12,15 @@ from __future__ import annotations +import json import os +import re import uuid from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from typing import Optional, Protocol +from .audit import detect_last_failure_signature from .config import ( JOURNAL_DIR, SAM_REPO, @@ -239,6 +242,69 @@ def _format_thread_history(self) -> str: lines.append("") return "\n".join(lines) + def _find_prior_failure_signature(self) -> Optional[dict]: + """Look in the last 2 days of journal files for a failure stub + matching this message's channel + thread_ts, and return the + parsed `last_failure_signature` if any. + + Best-effort: returns None on any parse or filesystem error. Used + by `_format_recovered_preamble` to give the recovering session + concrete context about what the prior attempt died doing. + + Why journal-stub-scan and not direct audit-log read: the prior + session_id isn't carried through the recovery path (the daemon + re-queues from Slack reactions). The safety-net journal stub IS + keyed by channel + thread_ts, so scanning recent journals is the + cheapest bridge. + """ + today = datetime.now().date() + # The journal stub uses single-quoted YAML for the signature + # value (see _safety_net_journal_entry). Match a quoted JSON blob. + # The 'thread_ts:' filter narrows to the right thread, and we want + # the LATEST such entry across the scanned files. + pat = re.compile( + r"---\s*\n" + r"(?P(?:.*\n){1,20}?)" + r"---", + re.MULTILINE, + ) + target_thread = (self.thread_ts or "").strip() + latest: Optional[dict] = None + latest_date = "" + for delta in range(2): + day = today - timedelta(days=delta) + path = JOURNAL_DIR / f"{day.isoformat()}.md" + if not path.exists(): + continue + try: + text = path.read_text(errors="replace") + except Exception: + continue + for m in pat.finditer(text): + fm = m.group("frontmatter") + if target_thread and f"thread_ts: {target_thread}" not in fm: + continue + if "status: " not in fm: + continue + sig_match = re.search( + r"last_failure_signature:\s*'(?P(?:[^']|'')+)'", + fm, + ) + if not sig_match: + continue + # Reverse the YAML single-quote escape ('' → '). + raw = sig_match.group("v").replace("''", "'") + try: + sig = json.loads(raw) + except Exception: + continue + date_match = re.search(r"date:\s*([0-9T:+\-\.]+)", fm) + date_str = date_match.group(1) if date_match else "" + if date_str > latest_date: + latest_date = date_str + latest = sig + return latest + def _format_recovered_preamble(self) -> str: """Honest "you're picking this up late" header for recovered sessions. @@ -247,6 +313,11 @@ def _format_recovered_preamble(self) -> str: has been waiting — Sam should know that and reply accordingly, not pretend the message just landed. Without this header Sam tends to "got it, working on it" a 30-min-old request as if it were fresh. + + When the prior failed session left a `last_failure_signature` in + the journal stub, prepend a structured note so this session + doesn't repeat the same shape (see + src/capabilities/tool-timeouts.md). """ import time age_seconds = 0.0 @@ -260,6 +331,31 @@ def _format_recovered_preamble(self) -> str: age_phrase = f"about {int(age_seconds // 60)} minutes ago" else: age_phrase = "just now (sub-minute interruption)" + + # Best-effort lookup; never crash the preamble. + sig: Optional[dict] = None + try: + sig = self._find_prior_failure_signature() + except Exception: + log.exception("recovered preamble: failure signature lookup failed") + + sig_block = "" + if sig: + sig_block = ( + "\n**Prior failure mode — do NOT retry this shape:**\n\n" + f"- Tool: `{sig.get('tool')}`\n" + f"- Error type: `{sig.get('error_type')}`\n" + f"- Repeated: {sig.get('repeat_count')} consecutive same-shape failures\n" + f"- Command: `{sig.get('command')}`\n" + f"- Last error: {sig.get('last_error_prefix')!r}\n\n" + "The prior attempt died in a repeat-failure loop. Treat the " + "underlying problem as environment, not syntax. Diagnose " + "the env, change approach (e.g. move external-repo audits " + "to that repo's CI), `ask_operator`, or post the failure " + "via `respond()` and exit. See " + "`src/capabilities/tool-timeouts.md`.\n\n" + ) + return ( "## RECOVERED SESSION — you are picking this up late\n\n" f"The Slack message below was sent {age_phrase}. A prior session " @@ -270,6 +366,7 @@ def _format_recovered_preamble(self) -> str: ":white_check_mark: / :x:, so the boot-time reactions-based " "recovery re-queued the message and the daemon woke you to " "handle it now.\n\n" + f"{sig_block}" "**Reply honestly about the delay.** Don't pretend the message " "just arrived — the operator has been waiting and can see the " "Slack timestamps. A brief acknowledgement of the gap is right; " @@ -909,6 +1006,23 @@ def _classify_tool_use( # longer trips the gate because we read the flag below, not # the timing. is_post = True + elif name == "ack": + # `ack` is a mid-session progress note. It posts to Slack + # but is NOT terminal: the silent-exit gate ignores it. + # Multiple calls per session are expected (the canonical + # shape is one ACK at start + N progress notes + one + # `respond` at the end). Inward: neither is_post nor + # is_outward — `ack` doesn't satisfy closed_loop and + # doesn't count as outward work. The explicit branch (vs + # falling through) is the contract — future readers + # should not look at `ack` and wonder whether it closes + # the loop. It doesn't. + pass + elif name == "set_status": + # `set_status` updates the Slack assistant-thread status + # indicator. Pure UI affordance — not a post, not outward + # work. Same inward treatment as `ack`. + pass elif name == "bash": command = input_dict.get("command") or "" if "chat.postMessage" in command or "chat.update" in command: @@ -952,21 +1066,58 @@ def _safety_net_journal_entry(self, result: SessionResult) -> None: Sam should write its own entries normally. This is for the cases where Sam was killed or crashed and couldn't. + + Includes `last_failure_signature` in the frontmatter when the audit + log shows the prior session died in a repeat-failure pattern + (e.g. 5x bash timeouts on the same command). The recovery preamble + in `_format_recovered_preamble` reads this signature back so the + new session knows what NOT to try again. See + src/capabilities/tool-timeouts.md. """ status = "stuck" if result.stuck else ("timed_out" if result.timed_out else "errored") + # Best-effort: never let signature detection crash the safety-net. + signature: Optional[dict] = None + try: + signature = detect_last_failure_signature(result.session_id) + except Exception: + log.exception("safety-net: detect_last_failure_signature failed") + + sig_yaml = "" + sig_section = "" + if signature: + # JSON-encode the signature as a single-line YAML value. Avoids + # multi-line YAML block scalars that the daemon's simple + # frontmatter parser doesn't support. + sig_json = json.dumps(signature, ensure_ascii=False) + # YAML single-quote escape: ' → '' inside single-quoted scalars. + sig_escaped = sig_json.replace("'", "''") + sig_yaml = f"last_failure_signature: '{sig_escaped}'\n" + sig_section = ( + "\n## Last failure signature\n\n" + f"- **Tool:** `{signature.get('tool')}`\n" + f"- **Error type:** `{signature.get('error_type')}`\n" + f"- **Repeated:** {signature.get('repeat_count')} consecutive same-shape failures in the tail\n" + f"- **Command:** `{signature.get('command')}`\n" + f"- **Last error prefix:** {signature.get('last_error_prefix')!r}\n\n" + "Future-Sam recovering this thread: don't retry that exact " + "command shape — diagnose env or change approach. See " + "src/capabilities/tool-timeouts.md.\n" + ) + + thread_ts_line = f"thread_ts: {self.message.thread_ts}\n" if self.message.thread_ts else "" entry = f"""--- date: {datetime.now().astimezone().isoformat()} session: {result.session_id} trigger: slack mention (channel={self.message.channel}) -status: {status} ---- +{thread_ts_line}status: {status} +{sig_yaml}--- ## What happened Session ended without writing its own journal entry. Daemon detected: {status}. The triggering message was from <@{self.message.user}> in channel {self.message.channel}. - +{sig_section} ## Open threads The original request may be unaddressed. Future-Sam should check the Slack thread. diff --git a/src/skills/daily-maintenance/skill.md b/src/skills/daily-maintenance/skill.md index a001e10..4a78cd0 100644 --- a/src/skills/daily-maintenance/skill.md +++ b/src/skills/daily-maintenance/skill.md @@ -59,6 +59,49 @@ for (tool, prefix), n in groups.most_common(30): **The predicate.** An actionable audit finding is **friction-pattern × not-already-covered**, not raw frequency. Frequency alone is noise — it just says Sam used a tool a lot. Friction × not-covered says Sam paid time-cost from the lack of documentation and no existing rule helps. +### Timeout / repeat-failure scan + +Independent of the friction scan above. Goal: surface every tool call that died at its wall-clock budget, and every repeat-failure pattern (same tool, same error_type, ≥2 in a row in a single session). These are the high-cost moments — a single bash timeout is 300s, and a repeat-of-three is 15 minutes of session burn before any progress. See `src/capabilities/tool-timeouts.md` for what each shape means. + +```bash +gcloud storage cat gs://${GCS_DATA_BUCKET:-dembrane-sameer-cli-sam-data}/tool_calls/$(date -u -d yesterday +%F).jsonl \ + | python3 -c " +import sys, json, collections +timeouts = [] +repeat_groups = collections.defaultdict(list) +for line in sys.stdin: + line = line.strip() + if not line: continue + try: d = json.loads(line) + except: continue + et = d.get('error_type') + if et == 'timeout': + timeouts.append(d) + if et: + sid = d.get('session_id','?') + tool = d.get('tool','?') + repeat_groups[(sid, tool, et)].append(d) +print(f'TIMEOUTS: {len(timeouts)} total') +for t in timeouts[:10]: + args = (t.get('args') or {}).get('command') or json.dumps(t.get('args')) + print(f\" {t.get('ts','?')[:19]} {t.get('tool','?')} ({t.get('duration_ms','?')}ms): {(args or '')[:100]}\") +repeats = [(k, len(v)) for k, v in repeat_groups.items() if len(v) >= 2] +print(f'REPEAT-FAILURE GROUPS: {len(repeats)} (>=2 same-shape failures in one session)') +for (sid, tool, et), n in sorted(repeats, key=lambda kv: -kv[1])[:10]: + print(f\" session={sid[:12]} tool={tool} error_type={et} count={n}\") +" +``` + +**Acting on findings.** If a repeat-failure group has ≥3 hits, that session lost ≥6 minutes of wall-clock on the same shape. The right response depends on the cause: + +| Pattern | Action | +|---|---| +| `bash` timeouts on a CLI tool not installed in the container (pip, gh extension, node) | Tier 3 candidate — the tool belongs in the container image, or the work belongs in target-repo CI. Surface in §5 Bucket C. | +| `bash` timeouts on network calls (git clone over a slow remote, PyPI unreachable) | Likely transient or auth/egress. Note in journal; only escalate if pattern repeats across days. | +| Non-bash timeouts | Investigate the tool's specific budget — may need a runtime config tweak. | + +This scan is independent of the audit-log friction scan above — frequency and timeout-rate measure different things. Frequency catches "Sam uses X a lot, codify it"; this scan catches "Sam wastes time when X fails, fix the underlying constraint." + ### Skill usage scan A separate aggregation — independent of the friction-pattern scan above. Goal: see which skills Sam actually *discovered* yesterday (i.e., `read_file` on `src/skills//skill.md`). A skill never read across multiple sessions where its `when_to_use` plausibly applied is either undiscovered (catalog problem) or unneeded (delete-it candidate). diff --git a/tests/runtime/test_silent_exit.py b/tests/runtime/test_silent_exit.py index 61ad89a..9069c98 100644 --- a/tests/runtime/test_silent_exit.py +++ b/tests/runtime/test_silent_exit.py @@ -115,6 +115,24 @@ def _bash_helper_script_cleanup() -> ToolUseRecord: return _bash("rm -f /tmp/post_reply.py") +def _ack(text: str = "got it — drafting now") -> ToolUseRecord: + """The mid-session progress-note tool call. Posts to Slack but + does NOT close the silent-exit gate. Multiple per session OK. + + Replaces the `_post()` bash-curl pattern Sam used for first-reply + ACKs and mid-progress updates. The classifier treats `ack` as + inward (neither outward work nor a post that satisfies the gate) + — Sam must still call `respond()` at the end. + """ + return ToolUseRecord(name="ack", input={"text": text}) + + +def _set_status(text: str = "drafting the PR") -> ToolUseRecord: + """The status-indicator tool call. UI affordance, not a post. + Inward; doesn't affect the gate.""" + return ToolUseRecord(name="set_status", input={"text": text}) + + def _respond(text: str = "the answer is X") -> ToolUseRecord: """The structured close-the-loop tool call. Its presence in the records list is the canonical "Sam closed the loop" signal — the @@ -285,6 +303,80 @@ def test_journal_only_session_does_NOT_close_loop(): assert _closed_loop(records) is False +# ─── `ack` tool — mid-session post that does NOT close the loop ─────────────── + + +def test_ack_alone_does_NOT_close_loop(): + """`ack()` posts a message but is explicitly not terminal. A + session that only calls ack() and exits doesn't close the loop + — the silent-exit gate fires and the daemon spawns a retry.""" + records = [_ack("got it — checking now")] + assert _closed_loop(records) is False + + +def test_ack_plus_work_no_respond_does_NOT_close_loop(): + """The exact failure mode `ack` is designed to keep visible: + Sam ACKs at the start, does the work, exits without `respond()`. + Same shape as the pre-ack bash-curl ACK pattern — must still trip + the gate.""" + records = [ + _set_status("reading"), + _ack("got it — drafting PR against echo"), + _read("/tmp/echo/server/requirements.txt"), + _gh("gh pr create --title security-audit"), + _journal_write(), + ] + assert _closed_loop(records) is False + + +def test_ack_plus_work_plus_respond_DOES_close_loop(): + """The canonical Slack-task shape: set_status + ack + work + + respond. respond() satisfies the gate via the `respond_called` + flag, regardless of the ack/status/journal noise around it.""" + records = [ + _set_status("reading the issue"), + _ack("got it — drafting PR against echo"), + _read("/tmp/echo/server/requirements.txt"), + _gh("gh pr create --title security-audit"), + _ack("PR opened, writing the description now"), + _journal_write(), + _respond("PR up. ..."), + ] + assert _closed_loop(records) is True + + +def test_many_acks_plus_respond_DOES_close_loop(): + """Multiple `ack` calls per session are expected and supported. + Doesn't affect the gate either way — only `respond()` (or a bash + chat.postMessage after the last outward call) closes it.""" + records = [ + _ack("first update"), + _ack("second update"), + _ack("third update"), + _gh("gh pr create"), + _ack("fourth update"), + _respond("final wrap"), + ] + assert _closed_loop(records) is True + + +def test_many_acks_no_respond_does_NOT_close_loop(): + """N acks with no respond = no closed loop. The operator saw + progress notes but never the final summary.""" + records = [ + _ack("ack 1"), _ack("ack 2"), _ack("ack 3"), + _gh("gh pr create"), + ] + assert _closed_loop(records) is False + + +def test_set_status_does_NOT_count_as_post(): + """`set_status` is a UI affordance, not a thread message. Alone + it doesn't satisfy the gate.""" + records = [_set_status("thinking")] + assert _closed_loop(records) is False + + def test_classify_tool_use_returns_6_values(): """Defends the tuple shape callers depend on. Added ask_operator_called in the async-question pattern PR — callers that unpack must use 6 names.""" diff --git a/tests/runtime/test_timeout_awareness.py b/tests/runtime/test_timeout_awareness.py new file mode 100644 index 0000000..10cd4c9 --- /dev/null +++ b/tests/runtime/test_timeout_awareness.py @@ -0,0 +1,512 @@ +"""Tests for the timeout-awareness + last-failure-signature feature. + +Covers: +- audit.record_tool_call accepts and writes error_type +- audit._infer_error_type detects the TIMEOUT prefix +- audit.read_recent_tool_calls filters by session_id and respects limit +- audit.detect_last_failure_signature returns None for clean sessions + and a structured dict when ≥2 same-shape failures land in the tail +- The bash tool's TIMEOUT payload starts with the recognized prefix + (so the inference path activates without callers passing error_type) +- session._safety_net_journal_entry writes last_failure_signature when + the audit log shows a repeat-failure pattern +- session._find_prior_failure_signature reads it back across the last + two days of journal files, filtered by thread_ts +""" +from __future__ import annotations + +import importlib +import json +import re +from datetime import datetime, timedelta, timezone +from pathlib import Path + + +def _reload_runtime_modules(): + """Reload config/audit/session in order so SAM_HOME-derived paths + pick up the test's tmp_sam_home fixture (constants are bound at + module-import time).""" + from src.runtime import config, audit, session + importlib.reload(config) + importlib.reload(audit) + importlib.reload(session) + return config, audit, session + + +def test_infer_error_type_detects_timeout_prefix(): + from src.runtime.audit import _infer_error_type + + assert _infer_error_type("TIMEOUT after 300s (wall-clock kill)") == "timeout" + assert _infer_error_type("error: command not found") == "error" + assert _infer_error_type("normal command output") is None + assert _infer_error_type("") is None + + +def test_record_tool_call_writes_error_type_field(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + + audit.record_tool_call( + session_id="sess-timeout-1", + agent="main", + tool="bash", + args={"command": "pip install pip-audit"}, + output="TIMEOUT after 300s (wall-clock kill, process killed).", + started_at=1.0, + duration_ms=300_000, + ) + + today = datetime.now(tz=timezone.utc).date() + rec = json.loads(config.tool_calls_path_for(today).read_text().strip()) + assert rec["error_type"] == "timeout" + assert rec["session_id"] == "sess-timeout-1" + + +def test_record_tool_call_no_error_type_for_clean_output(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + audit.record_tool_call( + session_id="sess-ok", + agent="main", + tool="bash", + args={"command": "echo hi"}, + output="hi", + started_at=1.0, + duration_ms=10, + ) + today = datetime.now(tz=timezone.utc).date() + rec = json.loads(config.tool_calls_path_for(today).read_text().strip()) + assert "error_type" not in rec + + +def test_read_recent_tool_calls_filters_by_session_id(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + + for i in range(5): + audit.record_tool_call( + session_id="sess-A", + agent="main", + tool="bash", + args={"command": f"cmd-A-{i}"}, + output="ok", + started_at=float(i), + duration_ms=100, + ) + for i in range(3): + audit.record_tool_call( + session_id="sess-B", + agent="main", + tool="bash", + args={"command": f"cmd-B-{i}"}, + output="ok", + started_at=float(10 + i), + duration_ms=100, + ) + + a_records = audit.read_recent_tool_calls("sess-A") + b_records = audit.read_recent_tool_calls("sess-B") + assert len(a_records) == 5 + assert len(b_records) == 3 + assert all(r["session_id"] == "sess-A" for r in a_records) + + +def test_read_recent_tool_calls_honors_limit(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + for i in range(10): + audit.record_tool_call( + session_id="sess-limit", + agent="main", + tool="bash", + args={"command": f"cmd-{i}"}, + output="ok", + started_at=float(i), + duration_ms=10, + ) + tail = audit.read_recent_tool_calls("sess-limit", limit=3) + assert len(tail) == 3 + # Tail should be the most recent 3. + assert [(r["args"] or {}).get("command") for r in tail] == [ + "cmd-7", "cmd-8", "cmd-9", + ] + + +def test_detect_last_failure_signature_returns_none_for_clean_session(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + for i in range(4): + audit.record_tool_call( + session_id="sess-clean", + agent="main", + tool="bash", + args={"command": f"echo {i}"}, + output="ok", + started_at=float(i), + duration_ms=10, + ) + assert audit.detect_last_failure_signature("sess-clean") is None + + +def test_detect_last_failure_signature_returns_dict_on_repeat(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + # One success, then 3 same-shape timeouts in a row. + audit.record_tool_call( + session_id="sess-fail", agent="main", tool="bash", + args={"command": "echo ok"}, output="ok", + started_at=0.0, duration_ms=10, + ) + for i in range(3): + audit.record_tool_call( + session_id="sess-fail", agent="main", tool="bash", + args={"command": "pip install pip-audit"}, + output="TIMEOUT after 300s (wall-clock kill, process killed).\n" + "Command: pip install pip-audit", + started_at=float(i + 1), + duration_ms=300_000, + ) + + sig = audit.detect_last_failure_signature("sess-fail") + assert sig is not None + assert sig["tool"] == "bash" + assert sig["error_type"] == "timeout" + assert sig["repeat_count"] == 3 + assert sig["command"].startswith("pip install pip-audit") + assert sig["last_error_prefix"].startswith("TIMEOUT after") + + +def test_detect_last_failure_signature_ignores_single_failure(tmp_sam_home): + config, audit, _ = _reload_runtime_modules() + # One timeout in a sea of successes — not a repeat pattern. + audit.record_tool_call( + session_id="sess-once", agent="main", tool="bash", + args={"command": "x"}, + output="TIMEOUT after 300s", + started_at=0.0, duration_ms=300_000, + ) + for i in range(4): + audit.record_tool_call( + session_id="sess-once", agent="main", tool="bash", + args={"command": f"y-{i}"}, output="ok", + started_at=float(i + 1), duration_ms=10, + ) + assert audit.detect_last_failure_signature("sess-once") is None + + +def test_bash_timeout_returns_structured_prefix(tmp_sam_home): + """The bash tool MUST return output starting with `TIMEOUT after Ns` + on timeout — that prefix is the contract the audit-log error_type + inference and the daily-maintenance scan both depend on.""" + import asyncio + from src.runtime.adk_runner import _make_bash, BASH_TIMEOUT_SECONDS + + bash = _make_bash(cwd=None, env=None) + # Use a very short sleep that will hit the daemon's timeout. We + # don't actually wait BASH_TIMEOUT_SECONDS (300s) here — we monkey- + # patch the timeout for this one call by re-defining a small bash + # closure with a 1s ceiling. + import os + from src.runtime import adk_runner + + async def quick_bash(command: str) -> str: + # Same body as production bash() but with a 1s timeout for the test. + try: + proc = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=None, + env=os.environ.copy(), + ) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(), timeout=1 + ) + except asyncio.TimeoutError: + proc.kill() + return ( + f"TIMEOUT after 1s (wall-clock kill, process killed).\n" + f"Command: {command}\n" + "Hint: test" + ) + return (stdout.decode() + stderr.decode()).strip() or "(no output)" + except Exception as exc: + return f"error: {exc}" + + out = asyncio.get_event_loop().run_until_complete(quick_bash("sleep 5")) + assert out.startswith("TIMEOUT after ") + assert "Command:" in out + # And the production bash() should be configured for 300s. + assert BASH_TIMEOUT_SECONDS == 300 + + +def test_safety_net_journal_includes_signature(tmp_sam_home): + """When the audit log shows repeat failures, the safety-net journal + stub records the signature in YAML frontmatter so the recovery + preamble can read it back.""" + config, audit, session_mod = _reload_runtime_modules() + + # Seed an audit log with a clear repeat-failure pattern. + for i in range(3): + audit.record_tool_call( + session_id="sess-X", agent="main", tool="bash", + args={"command": "pip install pip-audit"}, + output="TIMEOUT after 300s (wall-clock kill).", + started_at=float(i), duration_ms=300_000, + ) + + msg = session_mod.IncomingMessage( + channel="C123", + user="U999", + text="add pip-audit to echo", + thread_ts="1779465100.000100", + event_ts="1779465100.000100", + ) + sess = session_mod.SamSession(msg) + sess.session_id = "sess-X" + + result = session_mod.SessionResult( + session_id="sess-X", + started_at=0.0, + started_at_wall=0.0, + ended_at=1.0, + exit_code=1, + last_output_at=0.0, + stuck=False, + timed_out=True, + ) + sess._safety_net_journal_entry(result) + + journal_path = config.journal_path_for_today() + body = journal_path.read_text() + assert "status: timed_out" in body + assert "last_failure_signature:" in body + assert "thread_ts: 1779465100.000100" in body + # The structured journal section is human-readable too. + assert "Last failure signature" in body + assert "pip install pip-audit" in body + + +def test_find_prior_failure_signature_reads_back_from_journal(tmp_sam_home): + """`_find_prior_failure_signature` finds the latest matching stub + for this thread across the last two days of journal files.""" + config, audit, session_mod = _reload_runtime_modules() + + # Seed audit log for two distinct sessions on the same thread. + for i in range(3): + audit.record_tool_call( + session_id="prior-sess", agent="main", tool="bash", + args={"command": "gh repo clone foo"}, + output="TIMEOUT after 300s (wall-clock kill).", + started_at=float(i), duration_ms=300_000, + ) + + # Write a journal stub via _safety_net_journal_entry for the prior session. + prior_msg = session_mod.IncomingMessage( + channel="C123", + user="U999", + text="task A", + thread_ts="1779465200.000200", + event_ts="1779465200.000200", + ) + prior_sess = session_mod.SamSession(prior_msg) + prior_sess.session_id = "prior-sess" + prior_result = session_mod.SessionResult( + session_id="prior-sess", + started_at=0.0, started_at_wall=0.0, ended_at=1.0, + exit_code=1, last_output_at=0.0, + stuck=False, timed_out=True, + ) + prior_sess._safety_net_journal_entry(prior_result) + + # New (recovered) session on the same thread_ts. + new_msg = session_mod.IncomingMessage( + channel="C123", + user="U999", + text="task A (recovered)", + thread_ts="1779465200.000200", + event_ts="1779465200.000200", + recovered=True, + ) + sig = new_msg._find_prior_failure_signature() + assert sig is not None + assert sig["tool"] == "bash" + assert sig["error_type"] == "timeout" + assert sig["command"].startswith("gh repo clone") + + +def test_operator_cancel_writes_synthetic_audit_entry(tmp_sam_home): + """The cancel cleanup writes a synthetic record_tool_call with + `tool='_operator_cancel'` and `error_type='cancelled'` so the + audit log carries a marker that daily-maintenance can grep on. + Without this, the audit log shows the last completed call + followed by silence — daily-maintenance's timeout/repeat-failure + scan can't see cancels as a class.""" + import time + config, audit, _ = _reload_runtime_modules() + + audit.record_tool_call( + session_id="sess-cancel", + agent="_daemon", + tool="_operator_cancel", + args={ + "reaction": "no_entry", + "operator": "U_OPERATOR", + "channel": "C123", + "event_ts": "1111.0001", + }, + output="Session cancelled by operator via :no_entry: reaction on the original message.", + started_at=time.time(), + duration_ms=0, + error_type="cancelled", + ) + + today = datetime.now(tz=timezone.utc).date() + rec = json.loads(config.tool_calls_path_for(today).read_text().strip()) + assert rec["session_id"] == "sess-cancel" + assert rec["tool"] == "_operator_cancel" + assert rec["agent"] == "_daemon" + assert rec["error_type"] == "cancelled" + assert rec["args"]["reaction"] == "no_entry" + + +def test_read_recent_tool_calls_includes_cancel_entries(tmp_sam_home): + """`detect_last_failure_signature` won't fire on a cancel because + a single cancel entry isn't a 'repeat shape' — but the cancel + DOES appear in `read_recent_tool_calls`, which is what + daily-maintenance reads. Pin that path.""" + import time + config, audit, _ = _reload_runtime_modules() + + audit.record_tool_call( + session_id="sess-cancel-2", + agent="main", + tool="bash", + args={"command": "echo work"}, output="work", + started_at=time.time(), duration_ms=10, + ) + audit.record_tool_call( + session_id="sess-cancel-2", + agent="_daemon", + tool="_operator_cancel", + args={"reaction": "no_entry"}, + output="Session cancelled by operator.", + started_at=time.time(), duration_ms=0, + error_type="cancelled", + ) + + tail = audit.read_recent_tool_calls("sess-cancel-2") + assert len(tail) == 2 + assert tail[-1]["tool"] == "_operator_cancel" + assert tail[-1].get("error_type") == "cancelled" + + +def test_no_entry_is_terminal_for_recovery_filter(): + """`:no_entry:` from the operator-cancel path MUST be in + `_TERMINAL_REACTIONS` so reaction-based boot recovery doesn't + re-queue a cancelled message. Without this, the cancel mechanism + is effectively a no-op across restarts. + """ + from src.runtime.daemon import _TERMINAL_REACTIONS + + assert "no_entry" in _TERMINAL_REACTIONS + assert "white_check_mark" in _TERMINAL_REACTIONS + assert "x" in _TERMINAL_REACTIONS + + +def test_select_non_terminal_from_reactions_skips_cancelled_message(): + """Boot-recovery's `_select_non_terminal_from_reactions` must + treat a message that the bot has marked `:no_entry:` as terminal + — i.e. not re-queue it. Direct test against the filter so the + rule survives refactors of `_TERMINAL_REACTIONS`.""" + from src.runtime.daemon import _select_non_terminal_from_reactions + + items = [ + { + "type": "message", + "channel": "C123", + "message": { + "ts": "111.0001", + "reactions": [ + {"name": "no_entry", "users": ["BOT123"]}, + ], + }, + }, + { + "type": "message", + "channel": "C123", + "message": { + "ts": "222.0001", + "reactions": [ + {"name": "hourglass_flowing_sand", "users": ["BOT123"]}, + ], + }, + }, + ] + requeued = _select_non_terminal_from_reactions( + items, bot_user_id="BOT123", channel_allowed=lambda c: True, + ) + # The cancelled message (no_entry) must NOT be re-queued. + # The still-working message (hourglass) MUST be re-queued. + requeued_ts = [e.get("ts") for e in requeued] + assert "111.0001" not in requeued_ts + assert "222.0001" in requeued_ts + + +def test_reaction_added_handler_logic_recognizes_no_entry_cancel(): + """Structural smoke test for the cancel-handler logic. We don't + spin up a daemon; we exercise the conditions the handler checks + by reading the source. The handler must check :no_entry:, must + exclude the bot's own reactions (cleanup adds :no_entry: as + terminal), must match against the live session target, and must + actually call task.cancel(). AST-style invariant pin per + src/capabilities/self-maintenance.md (multi-step-invariants). + Brittle to renames — by design. + """ + import ast + from pathlib import Path + + src = Path("src/runtime/daemon.py").read_text() + tree = ast.parse(src) + handler_source: str | None = None + for node in ast.walk(tree): + if isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef)): + if node.name == "on_reaction_added": + handler_source = ast.get_source_segment(src, node) + break + assert handler_source is not None, "on_reaction_added handler not found" + assert "no_entry" in handler_source, "handler missing :no_entry: reaction check" + assert "bot_user_id" in handler_source, ( + "handler missing bot-self-exclusion check (cleanup stamps :no_entry:; " + "without exclusion the bot would cancel itself in a loop)" + ) + assert "_current_session_target" in handler_source, "handler missing live-session match" + assert "task.cancel()" in handler_source, "handler doesn't call task.cancel()" + + +def test_find_prior_failure_signature_returns_none_for_unrelated_thread(tmp_sam_home): + """A failure on thread A should not surface as the prior signature + for thread B's recovery.""" + config, audit, session_mod = _reload_runtime_modules() + + for i in range(3): + audit.record_tool_call( + session_id="prior-sess-2", agent="main", tool="bash", + args={"command": "x"}, + output="TIMEOUT after 300s", + started_at=float(i), duration_ms=300_000, + ) + prior_msg = session_mod.IncomingMessage( + channel="C123", user="U999", text="A", + thread_ts="1111.0001", event_ts="1111.0001", + ) + prior_sess = session_mod.SamSession(prior_msg) + prior_sess.session_id = "prior-sess-2" + prior_result = session_mod.SessionResult( + session_id="prior-sess-2", started_at=0.0, started_at_wall=0.0, + ended_at=1.0, exit_code=1, last_output_at=0.0, + stuck=False, timed_out=True, + ) + prior_sess._safety_net_journal_entry(prior_result) + + new_msg = session_mod.IncomingMessage( + channel="C123", user="U999", text="B", + thread_ts="2222.0001", event_ts="2222.0001", + recovered=True, + ) + assert new_msg._find_prior_failure_signature() is None