From 46d3d5917b4df8e24aaefddec933d0d17d52f835 Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Fri, 12 Jun 2026 13:49:46 +0300 Subject: [PATCH 1/3] feat(loops): loop-level wall-clock deadline (max_duration_seconds) (#1156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an optional total-duration bound to sequential agent loops, the third hard stop alongside the max_runs iteration cap. A loop legally configured today (max_runs=100 × timeout_per_run up to 2h + delays) could run for days; max_duration_seconds caps total wall-clock time. - Schema: agent_loops.max_duration_seconds INTEGER (nullable) + idempotent migration (agent_loops_max_duration). - Runner (loop_service): deadline measured from started_at, checked only at iteration boundaries (before the next run + before/after the inter-run delay, which is capped to the remaining budget). An in-flight run is never killed mid-turn — overshoot is bounded by one timeout_per_run. Expiry stops with stop_reason="deadline_exceeded", terminal status "stopped". - Router: optional max_duration_seconds (1..604800); 400 when smaller than the effective per-run timeout (timeout_per_run, else agent execution_timeout). GET /api/loops/{id} returns max_duration_seconds + computed elapsed_seconds. - MCP run_agent_loop + UI Loops form/detail expose the parameter and deadline. - Tests: deadline stop at boundary, in-flight run completes (not killed), delay capped to remaining budget, no-deadline regression, and the four validation paths. Docs: architecture.md (feature + schema) and requirements.md §38.2. Closes #1156 Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/memory/architecture.md | 5 +- docs/memory/requirements.md | 30 ++++ src/backend/db/loops.py | 4 + src/backend/db/migrations.py | 18 +++ src/backend/db/schema.py | 1 + src/backend/db/tables.py | 1 + src/backend/routers/loops.py | 59 ++++++++ src/backend/services/loop_service.py | 34 ++++- src/frontend/src/components/LoopsPanel.vue | 32 +++++ src/mcp-server/src/client.ts | 1 + src/mcp-server/src/tools/loops.ts | 15 ++ tests/unit/test_loop_service.py | 155 +++++++++++++++++++++ tests/unit/test_loops_router_validation.py | 98 +++++++++++++ 13 files changed, 449 insertions(+), 4 deletions(-) create mode 100644 tests/unit/test_loops_router_validation.py diff --git a/docs/memory/architecture.md b/docs/memory/architecture.md index 25cd0115d..a9c1ee5e5 100644 --- a/docs/memory/architecture.md +++ b/docs/memory/architecture.md @@ -424,7 +424,7 @@ Backend orchestration in `services/subscription_auto_switch.py`: `_hot_reload_su ### Sequential Agent Loops (#740, UI #1106) -Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`. +Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `max_duration_seconds`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. **Wall-clock deadline (#1156):** optional `max_duration_seconds` (≤7 days) measured from `started_at`, checked only at iteration boundaries (before the next run and before/after the inter-run delay, which is capped to the remaining budget) — an in-flight run is never killed mid-turn, so overshoot is bounded by one `timeout_per_run`; expiry stops the loop with `stop_reason="deadline_exceeded"`. Rejected at create (400) when smaller than the effective per-run timeout (`timeout_per_run`, else the agent's `execution_timeout_seconds`). `GET /api/loops/{id}` returns `max_duration_seconds` + computed `elapsed_seconds`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`. **Web UI (#1106):** a **Loops** tab on Agent Detail (`components/LoopsPanel.vue` + agent-scoped `stores/loops.js`; `setAgent(name)` on mount, `clear()` on unmount). The global WS handler routes the fleet-wide loop events to the store, which filters by mounted agent and targeted-refreshes only the affected loop; a 12s backstop poll runs while any loop is `queued`/`running` to recover a missed terminal event. Last full response rendered via `utils/markdown.js` (DOMPurify). @@ -1007,11 +1007,12 @@ CREATE TABLE agent_loops ( stop_signal TEXT, -- NULL = fixed mode; set = until mode delay_seconds INTEGER NOT NULL DEFAULT 0, timeout_per_run INTEGER, -- NULL = agent's execution_timeout_seconds + max_duration_seconds INTEGER, -- #1156: NULL = no wall-clock deadline (≤7d when set) model TEXT, allowed_tools TEXT, -- JSON array status TEXT NOT NULL, -- queued | running | completed | stopped | failed | interrupted runs_completed INTEGER NOT NULL DEFAULT 0, - stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | interrupted + stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | deadline_exceeded | error | interrupted last_response TEXT, error TEXT, started_by_user_id INTEGER, diff --git a/docs/memory/requirements.md b/docs/memory/requirements.md index fda77e0ae..89790858d 100644 --- a/docs/memory/requirements.md +++ b/docs/memory/requirements.md @@ -2755,6 +2755,36 @@ Standalone mobile-friendly admin page for managing agents on the go. Designed as auto-resume after restart; cross-agent loops (`agent` parameter is `"self"` only for v1, matching `fan_out`). +### 38.2 Loop-level wall-clock deadline (#1156) +- **Status**: ✅ Implemented +- **Implements**: Issue #1156 +- **Description**: A third hard stop alongside the `max_runs` iteration + cap and the (separately tracked) cost budget: an optional total + wall-clock deadline so a loop legally configured today (`max_runs=100` + × `timeout_per_run` up to 2h + `delay_seconds`) cannot run for days. +- **Parameter**: optional `max_duration_seconds` (int, 1 – 604800 = 7d; + NULL/omitted disables). Accepted on `POST /api/agents/{name}/loops`, + persisted on `agent_loops.max_duration_seconds`, exposed via the + `run_agent_loop` MCP tool. +- **Enforcement**: deadline measured from `started_at`; checked only at + iteration boundaries — before starting the next run and before/after + the inter-run delay (the `delay_seconds` sleep is capped to the + remaining budget, never sleeping past the deadline). An in-flight run + is never killed mid-turn, so actual overshoot is bounded by one + `timeout_per_run`. +- **Terminal state**: expiry stops the loop with terminal status + `stopped` and `stop_reason="deadline_exceeded"`. +- **Validation**: reject (400) `max_duration_seconds` smaller than the + effective per-run timeout (`timeout_per_run`, else the agent's + `execution_timeout_seconds`) — otherwise no iteration could finish + before the deadline. +- **Observability**: `GET /api/loops/{loop_id}` returns + `max_duration_seconds` and a computed `elapsed_seconds` (from + `started_at` to `completed_at` or now); the Loops UI shows the + deadline + elapsed when set. +- **Out of scope**: interrupting an in-flight run mid-turn; persisting + elapsed across a backend restart (loops do not auto-resume). + --- ## 39. VoIP Telephony (VOIP-001) diff --git a/src/backend/db/loops.py b/src/backend/db/loops.py index b4aa5d885..dc20d4534 100644 --- a/src/backend/db/loops.py +++ b/src/backend/db/loops.py @@ -30,6 +30,7 @@ def _loop_row_to_dict(row) -> dict: "stop_signal": row["stop_signal"], "delay_seconds": row["delay_seconds"], "timeout_per_run": row["timeout_per_run"], + "max_duration_seconds": row["max_duration_seconds"], "model": row["model"], "allowed_tools": json.loads(row["allowed_tools"]) if row["allowed_tools"] else None, "status": row["status"], @@ -78,6 +79,7 @@ def create_loop( stop_signal: Optional[str] = None, delay_seconds: int = 0, timeout_per_run: Optional[int] = None, + max_duration_seconds: Optional[int] = None, model: Optional[str] = None, allowed_tools: Optional[List[str]] = None, started_by_user_id: Optional[int] = None, @@ -99,6 +101,7 @@ def create_loop( stop_signal=stop_signal, delay_seconds=delay_seconds, timeout_per_run=timeout_per_run, + max_duration_seconds=max_duration_seconds, model=model, allowed_tools=allowed_tools_json, status="queued", @@ -126,6 +129,7 @@ def create_loop( "stop_signal": stop_signal, "delay_seconds": delay_seconds, "timeout_per_run": timeout_per_run, + "max_duration_seconds": max_duration_seconds, "model": model, "allowed_tools": allowed_tools, "status": "queued", diff --git a/src/backend/db/migrations.py b/src/backend/db/migrations.py index 55b3ee714..18e0ba509 100644 --- a/src/backend/db/migrations.py +++ b/src/backend/db/migrations.py @@ -2505,6 +2505,23 @@ def _migrate_agent_compatibility_results_table(cursor, conn): print("Created agent_compatibility_results table (#668)") +def _migrate_agent_loops_max_duration(cursor, conn): + """#1156 — loop-level wall-clock deadline. + + Adds `max_duration_seconds INTEGER` (NULL = no deadline) to `agent_loops`. + The runner stops the loop at the next iteration boundary once the deadline + measured from `started_at` is exceeded (stop_reason='deadline_exceeded'), + bounding total loop duration alongside the existing `max_runs` cap. + """ + _safe_add_column( + cursor, + "agent_loops", + "max_duration_seconds", + "ALTER TABLE agent_loops ADD COLUMN max_duration_seconds INTEGER", + ) + conn.commit() + + MIGRATIONS = [ ("agent_sharing", _migrate_agent_sharing_table), ("schedule_executions_observability", _migrate_schedule_executions_observability), @@ -2581,4 +2598,5 @@ def _migrate_agent_compatibility_results_table(cursor, conn): ("activities_created_index", _migrate_activities_created_index), ("agent_compatibility_results_table", _migrate_agent_compatibility_results_table), ("agent_ownership_voice_name", _migrate_agent_ownership_voice_name), + ("agent_loops_max_duration", _migrate_agent_loops_max_duration), ] diff --git a/src/backend/db/schema.py b/src/backend/db/schema.py index 051252f34..81a560864 100644 --- a/src/backend/db/schema.py +++ b/src/backend/db/schema.py @@ -248,6 +248,7 @@ stop_signal TEXT, delay_seconds INTEGER NOT NULL DEFAULT 0, timeout_per_run INTEGER, + max_duration_seconds INTEGER, model TEXT, allowed_tools TEXT, status TEXT NOT NULL, diff --git a/src/backend/db/tables.py b/src/backend/db/tables.py index 0d026de27..d87578181 100644 --- a/src/backend/db/tables.py +++ b/src/backend/db/tables.py @@ -233,6 +233,7 @@ def process_bind_param(self, value, dialect): Column("stop_signal", Text), Column("delay_seconds", Integer), Column("timeout_per_run", Integer), + Column("max_duration_seconds", Integer), # #1156 — wall-clock deadline Column("model", Text), Column("allowed_tools", Text), Column("status", Text), diff --git a/src/backend/routers/loops.py b/src/backend/routers/loops.py index 86f14cf42..cb03995bd 100644 --- a/src/backend/routers/loops.py +++ b/src/backend/routers/loops.py @@ -8,6 +8,7 @@ """ import logging +from datetime import datetime, timezone from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Header @@ -36,6 +37,10 @@ MAX_DELAY_SECONDS = 3600 MAX_TIMEOUT_PER_RUN = 7200 MAX_STOP_SIGNAL_LEN = 200 +MAX_DURATION_SECONDS = 604_800 # 7 days — hard ceiling on the wall-clock deadline +# Fallback per-run timeout used for deadline validation when neither +# timeout_per_run nor an agent-specific timeout is available. +DEFAULT_PER_RUN_TIMEOUT = 3600 class StartLoopRequest(BaseModel): @@ -44,6 +49,10 @@ class StartLoopRequest(BaseModel): stop_signal: Optional[str] = Field(default=None, max_length=MAX_STOP_SIGNAL_LEN) delay_seconds: int = Field(default=0, ge=0, le=MAX_DELAY_SECONDS) timeout_per_run: Optional[int] = Field(default=None, ge=10, le=MAX_TIMEOUT_PER_RUN) + # #1156: optional loop-level wall-clock deadline. NULL = unbounded + # (max_runs is still the hard stop). Lower bound vs the per-run timeout + # is validated in the endpoint (needs the agent's configured timeout). + max_duration_seconds: Optional[int] = Field(default=None, ge=1, le=MAX_DURATION_SECONDS) model: Optional[str] = None allowed_tools: Optional[List[str]] = None @@ -88,6 +97,10 @@ class LoopStatusResponse(BaseModel): created_at: str started_at: Optional[str] = None completed_at: Optional[str] = None + # #1156: wall-clock deadline (NULL = unbounded) + elapsed since started_at + # (frozen at completed_at once terminal). Both NULL before the loop runs. + max_duration_seconds: Optional[int] = None + elapsed_seconds: Optional[int] = None class StopLoopResponse(BaseModel): @@ -102,6 +115,29 @@ class StopLoopResponse(BaseModel): RESPONSE_PREVIEW_CHARS = 500 +def _parse_iso(ts: Optional[str]) -> Optional[datetime]: + """Parse a utc_now_iso() timestamp (ISO-Z) to an aware UTC datetime.""" + if not ts: + return None + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + except ValueError: + return None + + +def _elapsed_seconds(loop: dict) -> Optional[int]: + """Whole seconds from started_at to completed_at (terminal) or now. + + None until the loop has started. Powers the GET deadline/elapsed view + (#1156) so operators can see how close a running loop is to its bound. + """ + started = _parse_iso(loop.get("started_at")) + if started is None: + return None + end = _parse_iso(loop.get("completed_at")) or datetime.now(timezone.utc) + return max(0, int((end - started).total_seconds())) + + def _build_status_response(loop: dict) -> LoopStatusResponse: runs_raw = db.list_loop_runs(loop["id"]) runs: List[LoopRunResponse] = [] @@ -133,6 +169,8 @@ def _build_status_response(loop: dict) -> LoopStatusResponse: created_at=loop["created_at"], started_at=loop["started_at"], completed_at=loop["completed_at"], + max_duration_seconds=loop.get("max_duration_seconds"), + elapsed_seconds=_elapsed_seconds(loop), ) @@ -162,6 +200,26 @@ async def start_loop( x_mcp_key_name: Optional[str] = Header(None), ): """Start a sequential agent loop; return loop_id immediately (202).""" + # #1156: a deadline shorter than a single run can never let even one + # iteration finish — reject it. Compare against the effective per-run + # timeout (explicit override, else the agent's configured timeout). + if payload.max_duration_seconds is not None: + effective_per_run = payload.timeout_per_run + if effective_per_run is None: + try: + effective_per_run = db.get_execution_timeout(name) + except Exception: + effective_per_run = DEFAULT_PER_RUN_TIMEOUT + if payload.max_duration_seconds < effective_per_run: + raise HTTPException( + status_code=400, + detail=( + f"max_duration_seconds ({payload.max_duration_seconds}s) must be " + f">= the per-run timeout ({effective_per_run}s); otherwise no " + f"iteration could complete before the deadline." + ), + ) + service = get_loop_service() loop_row = await service.start_loop( agent_name=name, @@ -170,6 +228,7 @@ async def start_loop( stop_signal=payload.stop_signal, delay_seconds=payload.delay_seconds, timeout_per_run=payload.timeout_per_run, + max_duration_seconds=payload.max_duration_seconds, model=payload.model, allowed_tools=payload.allowed_tools, started_by_user_id=current_user.id, diff --git a/src/backend/services/loop_service.py b/src/backend/services/loop_service.py index 871f6afb4..3ceb65f6b 100644 --- a/src/backend/services/loop_service.py +++ b/src/backend/services/loop_service.py @@ -28,7 +28,7 @@ import json import logging from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from typing import Optional from database import db @@ -97,6 +97,7 @@ async def start_loop( stop_signal: Optional[str] = None, delay_seconds: int = 0, timeout_per_run: Optional[int] = None, + max_duration_seconds: Optional[int] = None, model: Optional[str] = None, allowed_tools: Optional[list] = None, started_by_user_id: Optional[int] = None, @@ -116,6 +117,7 @@ async def start_loop( stop_signal=stop_signal, delay_seconds=delay_seconds, timeout_per_run=timeout_per_run, + max_duration_seconds=max_duration_seconds, model=model, allowed_tools=allowed_tools, started_by_user_id=started_by_user_id, @@ -186,6 +188,17 @@ async def _run(self, loop_id: str) -> None: stop_reason = "max_runs_reached" terminal_error: Optional[str] = None + # #1156: optional wall-clock deadline measured from loop start + # (≈ started_at, just stamped by mark_loop_running above). NULL/0 + # disables. Enforced only at iteration boundaries, so an in-flight + # run is never killed mid-turn — overshoot is bounded by one + # timeout_per_run. + max_duration = loop.get("max_duration_seconds") + deadline = ( + datetime.utcnow() + timedelta(seconds=max_duration) + if max_duration else None + ) + try: for run_number in range(1, loop["max_runs"] + 1): # Cooperative stop check BEFORE starting the next iteration. @@ -196,6 +209,12 @@ async def _run(self, loop_id: str) -> None: stop_reason = "user_stopped" break + # #1156: deadline check at the iteration boundary. + if deadline is not None and datetime.utcnow() >= deadline: + terminal_status = "stopped" + stop_reason = "deadline_exceeded" + break + rendered = _render_template( loop["message_template"], run_number, previous_response, ) @@ -300,8 +319,19 @@ async def _run(self, loop_id: str) -> None: # Inter-run delay — also a stop point. if loop["delay_seconds"] and run_number < loop["max_runs"]: + sleep_for = loop["delay_seconds"] + # #1156: never sleep past the deadline — cap the delay to + # the remaining budget; the next boundary check then stops + # the loop with deadline_exceeded. + if deadline is not None: + remaining = (deadline - datetime.utcnow()).total_seconds() + if remaining <= 0: + terminal_status = "stopped" + stop_reason = "deadline_exceeded" + break + sleep_for = min(sleep_for, remaining) try: - await asyncio.sleep(loop["delay_seconds"]) + await asyncio.sleep(sleep_for) except asyncio.CancelledError: terminal_status = "stopped" stop_reason = "user_stopped" diff --git a/src/frontend/src/components/LoopsPanel.vue b/src/frontend/src/components/LoopsPanel.vue index 29ff98013..d2101e6c9 100644 --- a/src/frontend/src/components/LoopsPanel.vue +++ b/src/frontend/src/components/LoopsPanel.vue @@ -89,6 +89,20 @@ class="w-full px-3 py-2 border border-gray-300 dark:border-gray-600 dark:bg-gray-700 dark:text-white rounded-md focus:outline-none focus:ring-2 focus:ring-action-primary-500" /> + + +
+ + +

Total wall-clock limit; checked between runs. Must be ≥ the per-run timeout.

+
@@ -224,6 +238,12 @@ {{ loop.error }} + +
+ Deadline: + {{ formatSeconds(loop.elapsed_seconds) }} / {{ formatSeconds(loop.max_duration_seconds) }} elapsed +
+

Runs

@@ -301,6 +321,7 @@ const defaultForm = () => ({ stop_signal: '', delay_seconds: 0, timeout_per_run: null, + max_duration_seconds: null, model: '', allowed_tools: null, }) @@ -354,6 +375,7 @@ async function submit() { if (form.stop_signal && form.stop_signal.trim()) payload.stop_signal = form.stop_signal.trim() if (form.delay_seconds) payload.delay_seconds = form.delay_seconds if (form.timeout_per_run) payload.timeout_per_run = form.timeout_per_run + if (form.max_duration_seconds) payload.max_duration_seconds = form.max_duration_seconds if (form.model) payload.model = form.model if (form.allowed_tools !== null) payload.allowed_tools = form.allowed_tools @@ -397,12 +419,22 @@ function formatStopReason(reason) { max_runs_reached: 'reached max runs', stop_signal_matched: 'stop signal matched', user_stopped: 'stopped by user', + deadline_exceeded: 'deadline exceeded', error: 'error', interrupted: 'interrupted', } return map[reason] || reason } +function formatSeconds(secs) { + if (secs === null || secs === undefined) return '—' + if (secs < 60) return `${secs}s` + if (secs < 3600) return `${Math.floor(secs / 60)}m ${secs % 60}s` + const h = Math.floor(secs / 3600) + const m = Math.floor((secs % 3600) / 60) + return `${h}h ${m}m` +} + function formatCost(cost) { if (cost === null || cost === undefined) return '—' return `$${cost.toFixed(4)}` diff --git a/src/mcp-server/src/client.ts b/src/mcp-server/src/client.ts index 9c8b7b228..958f7b749 100644 --- a/src/mcp-server/src/client.ts +++ b/src/mcp-server/src/client.ts @@ -1815,6 +1815,7 @@ export class TrinityClient { stop_signal?: string; delay_seconds?: number; timeout_per_run?: number; + max_duration_seconds?: number; model?: string; allowed_tools?: string[]; } diff --git a/src/mcp-server/src/tools/loops.ts b/src/mcp-server/src/tools/loops.ts index 8b5c8cae4..a331f9278 100644 --- a/src/mcp-server/src/tools/loops.ts +++ b/src/mcp-server/src/tools/loops.ts @@ -108,6 +108,19 @@ export function createLoopTools( .describe( "Per-iteration timeout in seconds (defaults to agent's configured execution_timeout_seconds)." ), + max_duration_seconds: z + .number() + .int() + .min(1) + .max(604_800) + .optional() + .describe( + "Optional loop-level wall-clock deadline in seconds (1–604800 = up to 7 days). " + + "Checked at each iteration boundary; an in-flight run is never killed mid-turn. " + + "When the deadline passes the loop stops with stop_reason='deadline_exceeded'. " + + "Must be >= timeout_per_run (or the agent's execution timeout when unset). " + + "Omit for no time bound (max_runs still applies)." + ), model: z .string() .optional() @@ -125,6 +138,7 @@ export function createLoopTools( stop_signal?: string; delay_seconds?: number; timeout_per_run?: number; + max_duration_seconds?: number; model?: string; allowed_tools?: string[]; }, @@ -146,6 +160,7 @@ export function createLoopTools( stop_signal: params.stop_signal, delay_seconds: params.delay_seconds, timeout_per_run: params.timeout_per_run, + max_duration_seconds: params.max_duration_seconds, model: params.model, allowed_tools: params.allowed_tools, }); diff --git a/tests/unit/test_loop_service.py b/tests/unit/test_loop_service.py index 29d2a0d59..0b9a09d75 100644 --- a/tests/unit/test_loop_service.py +++ b/tests/unit/test_loop_service.py @@ -16,6 +16,7 @@ import asyncio import sys from dataclasses import dataclass, field +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Optional from unittest.mock import AsyncMock, MagicMock @@ -544,6 +545,160 @@ def test_orphan_sweep_idempotent(self, loop_module): assert db.mark_orphan_loops_interrupted() == 0 +# --------------------------------------------------------------------------- +# Runner — wall-clock deadline (#1156) +# --------------------------------------------------------------------------- + +class _FakeClock: + """Controllable stand-in for ``datetime`` inside loop_service. + + Only ``utcnow()`` is exercised by the runner; it returns the current + fake instant. Tests advance ``now`` (directly or via a task that bumps + it each run) to drive the deadline deterministically — no real sleeping. + """ + now = datetime(2026, 1, 1, 0, 0, 0) + + @classmethod + def utcnow(cls): + return cls.now + + +class TestDeadline: + def _install_clock(self, ls, monkeypatch, *, advance_per_run: float): + """Swap in the fake clock; each execute_task advances it.""" + _FakeClock.now = datetime(2026, 1, 1, 0, 0, 0) + monkeypatch.setattr(ls, "datetime", _FakeClock) + + async def _exec(**kwargs): + ts = self._ts + ts.calls.append(kwargs) + result = ts.results[ts._idx] if ts._idx < len(ts.results) else _Result() + ts._idx += 1 + _FakeClock.now = _FakeClock.now + timedelta(seconds=advance_per_run) + return result + + return _exec + + def test_deadline_stops_loop_at_boundary(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response=f"r{i}") for i in range(1, 6)] + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=6) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=5, + max_duration_seconds=10, # ~1.6 runs fit before the deadline + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["status"] == "stopped" + assert loop["stop_reason"] == "deadline_exceeded" + # Run 1 (t0→6) and run 2 (t6→12) both started before the deadline; the + # boundary check before run 3 (t12 ≥ 10) trips. max_runs never reached. + assert loop["runs_completed"] == 2 + assert len(ts.calls) == 2 + + def test_in_flight_run_is_not_killed_mid_turn(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response="done-run")] + # One run pushes the clock well past the deadline; that run must still + # finalize as completed (deadline is enforced only at the boundary). + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=999) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=5, + max_duration_seconds=10, + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["stop_reason"] == "deadline_exceeded" + assert loop["runs_completed"] == 1 # the in-flight run completed + runs = db.list_loop_runs(loop_id) + assert runs[0]["status"] == "completed" + assert runs[0]["response"] == "done-run" + + def test_delay_does_not_sleep_past_deadline(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response="r1"), _Result(response="r2")] + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=3) + + # Capture sleep durations and advance the fake clock by them instead + # of really sleeping. + slept: list[float] = [] + + async def _fake_sleep(secs): + slept.append(secs) + _FakeClock.now = _FakeClock.now + timedelta(seconds=secs) + + monkeypatch.setattr(ls.asyncio, "sleep", _fake_sleep) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=5, + delay_seconds=100, # would blow way past the deadline + max_duration_seconds=10, + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["stop_reason"] == "deadline_exceeded" + # run1 t0→3, then delay capped to remaining (10−3=7), not the full 100. + assert slept == [7] + + def test_no_deadline_runs_all_when_unset(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response=f"r{i}") for i in range(1, 4)] + # Clock jumps far each run; with no deadline it must be ignored. + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=10_000) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=3, + max_duration_seconds=None, + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["status"] == "completed" + assert loop["stop_reason"] == "max_runs_reached" + assert loop["runs_completed"] == 3 + + # --------------------------------------------------------------------------- # get_status # --------------------------------------------------------------------------- diff --git a/tests/unit/test_loops_router_validation.py b/tests/unit/test_loops_router_validation.py new file mode 100644 index 000000000..b148e81b1 --- /dev/null +++ b/tests/unit/test_loops_router_validation.py @@ -0,0 +1,98 @@ +"""Validation tests for the loops router — max_duration_seconds deadline (#1156). + +Calls the ``start_loop`` endpoint coroutine directly with mocked auth/db/service +so the cross-field validation (deadline must be >= the effective per-run +timeout) can be exercised without a live FastAPI app or database. +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +_BACKEND = Path(__file__).resolve().parent.parent.parent / "src" / "backend" +_BACKEND_STR = str(_BACKEND) +while _BACKEND_STR in sys.path: + sys.path.remove(_BACKEND_STR) +sys.path.insert(0, _BACKEND_STR) + +pytestmark = pytest.mark.unit + + +def _load_router(monkeypatch): + from routers import loops as loops_router + + fake_db = MagicMock() + fake_db.get_execution_timeout.return_value = 900 # agent default for tests + monkeypatch.setattr(loops_router, "db", fake_db) + + fake_service = MagicMock() + fake_service.start_loop = AsyncMock( + return_value={"id": "loop_x", "status": "queued"} + ) + monkeypatch.setattr(loops_router, "get_loop_service", lambda: fake_service) + + return loops_router, fake_db, fake_service + + +def _user(): + u = MagicMock() + u.id = 1 + u.email = "u@example.com" + return u + + +async def _call(loops_router, payload): + return await loops_router.start_loop( + payload=payload, + name="a1", + current_user=_user(), + x_source_agent=None, + x_mcp_key_id=None, + x_mcp_key_name=None, + ) + + +class TestMaxDurationValidation: + def test_rejects_deadline_below_explicit_timeout_per_run(self, monkeypatch): + loops_router, _, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest( + message="m", max_runs=5, timeout_per_run=600, max_duration_seconds=300, + ) + with pytest.raises(loops_router.HTTPException) as exc: + __import__("asyncio").run(_call(loops_router, payload)) + assert exc.value.status_code == 400 + assert "max_duration_seconds" in exc.value.detail + service.start_loop.assert_not_called() + + def test_rejects_deadline_below_agent_timeout_when_per_run_unset(self, monkeypatch): + loops_router, fake_db, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest( + message="m", max_runs=5, max_duration_seconds=100, # < 900 agent default + ) + with pytest.raises(loops_router.HTTPException) as exc: + __import__("asyncio").run(_call(loops_router, payload)) + assert exc.value.status_code == 400 + fake_db.get_execution_timeout.assert_called_once_with("a1") + service.start_loop.assert_not_called() + + def test_accepts_deadline_at_or_above_timeout(self, monkeypatch): + loops_router, _, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest( + message="m", max_runs=5, timeout_per_run=600, max_duration_seconds=600, + ) + resp = __import__("asyncio").run(_call(loops_router, payload)) + assert resp.loop_id == "loop_x" + # deadline threaded through to the service + assert service.start_loop.await_args.kwargs["max_duration_seconds"] == 600 + + def test_no_deadline_skips_validation(self, monkeypatch): + loops_router, fake_db, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest(message="m", max_runs=5) + resp = __import__("asyncio").run(_call(loops_router, payload)) + assert resp.loop_id == "loop_x" + fake_db.get_execution_timeout.assert_not_called() + assert service.start_loop.await_args.kwargs["max_duration_seconds"] is None From 30bbaf9f259b8ecec0dc3dff11debfe3c97d51c6 Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Tue, 23 Jun 2026 12:56:02 +0300 Subject: [PATCH 2/3] fix(db): add Alembic revision for agent_loops.max_duration_seconds (#1156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SQLite path got the column via db/migrations.py + schema.py, but the PostgreSQL backend is Alembic-owned and the sqlite PRAGMA migrations are skipped — so existing PG deployments never received the column, and loops.py's insert(agent_loops).values(max_duration_seconds=...) would fail on Postgres. Adds 0004_agent_loops_max_duration (down_revision 0003) mirroring the 0002/0003 precedent: idempotent ADD COLUMN IF NOT EXISTS so it's a no-op on fresh PG builds (baseline already creates it from schema.py:TABLES) and adds it in place on existing PG deployments. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../versions/0005_agent_loops_max_duration.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 src/backend/migrations/versions/0005_agent_loops_max_duration.py diff --git a/src/backend/migrations/versions/0005_agent_loops_max_duration.py b/src/backend/migrations/versions/0005_agent_loops_max_duration.py new file mode 100644 index 000000000..5559a2cfa --- /dev/null +++ b/src/backend/migrations/versions/0005_agent_loops_max_duration.py @@ -0,0 +1,36 @@ +"""agent_loops.max_duration_seconds (#1156) + +Adds the loop-level wall-clock deadline column on the PostgreSQL backend. +Mirrors the SQLite ``agent_loops_max_duration`` migration in +``db/migrations.py`` and the DDL in ``db/schema.py``. + +Fresh PG builds already get this column because ``0001_baseline`` iterates +``db/schema.py:TABLES`` (whose ``agent_loops`` DDL now includes it). This +revision exists so an *existing* PG deployment — stamped at an earlier revision +and never re-running baseline — also picks the column up on +``alembic upgrade head``. ``ADD COLUMN IF NOT EXISTS`` keeps it a no-op when the +baseline already created the table with the column. + +Revision ID: 0005_agent_loops_max_duration +Revises: 0004_agent_ownership_voice_name +Create Date: 2026-06-23 +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0005_agent_loops_max_duration" +down_revision = "0004_agent_ownership_voice_name" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + "ALTER TABLE agent_loops ADD COLUMN IF NOT EXISTS max_duration_seconds INTEGER" + ) + + +def downgrade() -> None: + op.execute( + "ALTER TABLE agent_loops DROP COLUMN IF EXISTS max_duration_seconds" + ) From 53ed5aa837a0dcc06916fd0966820ab5e89a706a Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Thu, 25 Jun 2026 13:30:40 +0300 Subject: [PATCH 3/3] docs(flows): document loop wall-clock deadline in run-agent-loop flow (#1156) The feature added `max_duration_seconds` but the run-agent-loop feature flow wasn't updated (flagged by /validate-pr). Weave the wall-clock deadline through the flow: tool/UI params, the create-time 400 validation + GET elapsed_seconds, the iteration-boundary deadline check and `deadline_exceeded` terminal reason, the dual-track migration (SQLite + Alembic 0005), error-handling rows, the 604800s limit, and the new tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/memory/feature-flows/run-agent-loop.md | 24 ++++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/docs/memory/feature-flows/run-agent-loop.md b/docs/memory/feature-flows/run-agent-loop.md index fd1a84d45..0e75ac7b4 100644 --- a/docs/memory/feature-flows/run-agent-loop.md +++ b/docs/memory/feature-flows/run-agent-loop.md @@ -17,7 +17,7 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard ## Frontend Layer (Phase 2, #1106) - **Tab**: `src/frontend/src/views/AgentDetail.vue` adds `{ id: 'loops', label: 'Loops' }` and mounts ``. -- **Component**: `src/frontend/src/components/LoopsPanel.vue` — collapsible Run-loop form (message template w/ `{{run}}`/`{{previous_response}}` helper text, `max_runs`, `stop_signal`, `delay_seconds`, `timeout_per_run`, `model` via `ModelSelector`, `allowed_tools` picker), loop list with status badge + `runs_completed/max_runs` + `stop_reason`, expandable per-run table (#/status/cost/duration/response), last full response via `renderMarkdown`, and a Stop control reflecting `stopping`→`stopped`. The Run-loop button is gated on `agentStatus === 'running'`. +- **Component**: `src/frontend/src/components/LoopsPanel.vue` — collapsible Run-loop form (message template w/ `{{run}}`/`{{previous_response}}` helper text, `max_runs`, `stop_signal`, `delay_seconds`, `timeout_per_run`, `max_duration_seconds`, `model` via `ModelSelector`, `allowed_tools` picker), loop list with status badge + `runs_completed/max_runs` + `stop_reason`, expandable per-run table (#/status/cost/duration/response), last full response via `renderMarkdown`, and a Stop control reflecting `stopping`→`stopped`. The Run-loop button is gated on `agentStatus === 'running'`. - **Store**: `src/frontend/src/stores/loops.js` — agent-scoped Pinia store on the shared `api.js` client (Invariant #7). `setAgent(name)`/`clear()` bind the store to the mounted agent; `handleWebSocketEvent` filters fleet-wide `loop_run_completed`/`loop_completed` events by that agent and targeted-refreshes only the affected loop; a 12s backstop poll runs while any loop is `queued`/`running` to recover a missed terminal event. `expandedLoopId` lives in the store so it survives the `v-if` tab remount. - **WS wiring**: `src/frontend/src/utils/websocket.js` routes the `data.type`-keyed loop events to `loopsStore.handleWebSocketEvent` in the `default:` branch. - **e2e**: `src/frontend/e2e/loops-panel.spec.js` (@interactive — needs a live stack + running agent). @@ -30,7 +30,7 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard ### Tool definitions - `src/mcp-server/src/tools/loops.ts` - Permission model identical to `chat_with_agent`: owner/admin/shared on the agent, or explicit `agent_permissions` for agent-scoped MCP keys. Backend enforces — MCP tools surface a cleaner message for unscoped keys. -- `run_agent_loop` accepts `message`, `max_runs` (1–100, required), optional `stop_signal`, `delay_seconds` (0–3600), `timeout_per_run` (10–7200), `model`, `allowed_tools`. `agent_name` is required for user-scoped keys and defaults to the bound agent for agent-scoped keys. +- `run_agent_loop` accepts `message`, `max_runs` (1–100, required), optional `stop_signal`, `delay_seconds` (0–3600), `timeout_per_run` (10–7200), `max_duration_seconds` (1–604800 = 7d; optional wall-clock deadline, #1156), `model`, `allowed_tools`. `agent_name` is required for user-scoped keys and defaults to the bound agent for agent-scoped keys. ### Client methods - `src/mcp-server/src/client.ts` — `startAgentLoop`, `getLoopStatus`, `stopAgentLoop`. @@ -39,13 +39,16 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard ### Router - `src/backend/routers/loops.py` — two routers exported (agent-scoped + loop-scoped) and both mounted in `main.py`. -- Request validation via `StartLoopRequest` Pydantic model (`max_runs` 1–100, `message` 1–100_000 chars, `stop_signal` ≤200 chars and stripped — blank → `None` → fixed mode). +- Request validation via `StartLoopRequest` Pydantic model (`max_runs` 1–100, `message` 1–100_000 chars, `stop_signal` ≤200 chars and stripped — blank → `None` → fixed mode; `max_duration_seconds` 1–604800). - 202 Accepted on start; 404 on unknown loop; 403 if caller is not the initiator and lacks agent access. +- **400** when `max_duration_seconds` is smaller than the effective per-run timeout (`timeout_per_run`, else the agent's `execution_timeout_seconds`) — a deadline that can't fit one run is rejected rather than silently never firing (#1156). +- `GET /api/loops/{id}` returns `max_duration_seconds` plus a computed `elapsed_seconds` (from `started_at`). ### Service - `src/backend/services/loop_service.py` — `LoopService.start_loop()` creates the `agent_loops` row and spawns an `asyncio.Task` via `_run()`. One in-process handle per active loop (`_handles: dict[str, _LoopHandle]`) tracks the cooperative stop flag. +- **Wall-clock deadline (#1156):** when `max_duration_seconds` is set, the runner checks `now - started_at` only at iteration boundaries (before the next run, and before/after the inter-run delay — which is itself capped to the remaining budget). An in-flight run is never killed mid-turn, so total overshoot is bounded by one `timeout_per_run`; on expiry the loop exits `completed` / `stop_reason="deadline_exceeded"`. Complements the `max_runs` count cap with a time cap. - Iteration body: - 1. Cooperative stop check (`handle.should_stop`). + 1. Cooperative stop check (`handle.should_stop`); then the deadline check above. 2. Template substitution: `{{run}}` → 1-indexed; `{{previous_response}}` → trailing 2000 chars of last response (empty on iteration 1). 3. Insert `agent_loop_runs` row in `running` status. 4. `await task_execution_service.execute_task(triggered_by="loop", loop_id=...)`. @@ -56,6 +59,7 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard - Terminal states + reasons: - `completed` / `max_runs_reached` - `completed` / `stop_signal_matched` + - `completed` / `deadline_exceeded` (`max_duration_seconds` wall-clock budget reached, #1156) - `stopped` / `user_stopped` (via `stop_loop`) - `failed` / `error` (any iteration's `TaskExecutionResult.status != "success"` or an unhandled exception) - `interrupted` / `interrupted` (backend restart, swept by cleanup-service) @@ -68,7 +72,8 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard - Facade: `src/backend/database.py` exposes all of the above on `db`. ### Schema + migration -- `src/backend/db/schema.py` — `agent_loops`, `agent_loop_runs`, plus `loop_id TEXT` column on `schedule_executions` + index `idx_executions_loop`. +- `src/backend/db/schema.py` / `db/tables.py` — `agent_loops`, `agent_loop_runs`, plus `loop_id TEXT` column on `schedule_executions` + index `idx_executions_loop`. `agent_loops.max_duration_seconds INTEGER` (NULL = no deadline) added for #1156. +- **Dual-track migration (Invariant #9)** for `max_duration_seconds`: SQLite `_migrate_agent_loops_max_duration` in `db/migrations.py` (`_safe_add_column`) **and** Alembic revision `src/backend/migrations/versions/0005_agent_loops_max_duration.py` (`ADD COLUMN IF NOT EXISTS`, chained after `0004_agent_ownership_voice_name`) for the Postgres backend. - `src/backend/db/migrations.py` — `_migrate_agent_loops_tables` (idempotent `CREATE TABLE IF NOT EXISTS` + `_safe_add_column` for the existing executions table). ### Execution dispatch @@ -93,6 +98,8 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard | Iteration raises Python exception | `agent_loop_runs.status='failed'`, `agent_loops.status='failed'`, `stop_reason='error'`, loop terminates | | Iteration returns `TaskExecutionResult.status != "success"` | Same as above; `agent_loops.error` carries the iteration number + task error | | Stop requested while iteration in flight | Current iteration completes; loop exits with `stop_reason="user_stopped"` | +| Wall-clock deadline reached (`max_duration_seconds`) | Detected at an iteration boundary; loop exits `completed` / `stop_reason="deadline_exceeded"`. An in-flight run is never killed mid-turn (overshoot ≤ one `timeout_per_run`) | +| `max_duration_seconds` < effective per-run timeout | Rejected at create with **400** (can't fit one run) | | Backend restart mid-loop | On next boot, cleanup-service flips to `interrupted` | | Stop on already-terminal loop | Returns `already_done` (no-op) | | Stop on unknown loop | Returns `not_found` (router returns 404 separately) | @@ -101,7 +108,7 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard - Standard agent-access check on start (`get_authorized_agent`). - Loop-scoped endpoints (`/api/loops/{id}/...`) verify that the caller is the initiator OR has access to the underlying agent (owner/admin/shared via `db.can_user_access_agent`). - No sensitive data in WS events — `cost`, `duration_ms`, `run_number`, `execution_id` only. -- `max_runs` capped at 100; `delay_seconds` at 3600; `timeout_per_run` at 7200 to bound resource consumption. +- `max_runs` capped at 100; `delay_seconds` at 3600; `timeout_per_run` at 7200; `max_duration_seconds` at 604800 (7d) to bound resource consumption. ## Testing **Prerequisites**: backend running; an agent the caller can access. @@ -116,13 +123,14 @@ Phase 1 shipped headless (API/MCP only); iterations also appear in the standard **Edge Cases**: - `max_runs=0` → 422. - `max_runs=101` → 422. +- `max_duration_seconds` below the effective per-run timeout → 400; start a loop with a tight `max_duration_seconds` and verify it stops `completed` / `deadline_exceeded` before `max_runs`. - Loop on a non-accessible agent → 403. - Stop on already-completed loop → `{"status": "already_done"}`. - Backend restart mid-loop → next `GET /api/loops/{loop_id}` shows `status="interrupted"`. -**Unit tests**: `tests/unit/test_loop_service.py` covers fixed/until modes, template substitution, graceful stop, failure paths, restart recovery, and `get_status`. +**Unit tests**: `tests/unit/test_loop_service.py` covers fixed/until modes, template substitution, graceful stop, failure paths, restart recovery, `get_status`, and the #1156 wall-clock deadline (boundary check, delay capped to remaining budget, `deadline_exceeded`). `tests/unit/test_loops_router_validation.py` covers the create-time `max_duration_seconds` validation (400 below the effective per-run timeout). -**Status**: ✅ Phase 1 (backend/MCP) + Phase 2 (web UI, #1106) shipped. +**Status**: ✅ Phase 1 (backend/MCP) + Phase 2 (web UI, #1106) shipped; wall-clock deadline (`max_duration_seconds`, #1156) added. ## Related Flows - **Upstream**: `task-execution-service.md` — each iteration dispatches through `TaskExecutionService`.