diff --git a/README.md b/README.md index c0a1564..8fd2801 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ cargo run -p decodex -- --help cargo run -p decodex -- probe stdio:// cargo run -p decodex -- project list cargo run -p decodex -- status +cargo run -p decodex -- diagnose --json cargo run -p decodex -- run --dry-run cargo run -p decodex -- serve --interval 60s --listen-address 127.0.0.1:8912 ``` @@ -110,6 +111,10 @@ Project contracts are managed outside checkouts under The redacted template for a project config lives at `decodex.example.toml`. +`decodex diagnose --json` writes the local agent evidence index under +`~/.codex/decodex/agent-evidence//` and prints the same handoff index for +repair agents. + ## Static Site The public site is an Astro static site under `site/`. It renders checked-in content and diff --git a/apps/decodex/src/agent.rs b/apps/decodex/src/agent.rs index e2f4873..d10a473 100644 --- a/apps/decodex/src/agent.rs +++ b/apps/decodex/src/agent.rs @@ -15,12 +15,11 @@ pub(crate) use self::{ decodex_tool_bridge::{DecodexRunContext, DecodexToolBridge}, json_rpc::{AppServerHomePreflightFailure, AppServerProcessEnv, AppServerTransportFailure}, tracker_tool_bridge::{ - ISSUE_COMMENT_TOOL_NAME, ISSUE_DELIVERY_CLOSEOUT_COMPLETE_TOOL_NAME, - ISSUE_LABEL_ADD_TOOL_NAME, ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, - ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_HANDOFF_TOOL_NAME, - ISSUE_REVIEW_REPAIR_COMPLETE_TOOL_NAME, ISSUE_TERMINAL_FINALIZE_TOOL_NAME, - ISSUE_TRANSITION_TOOL_NAME, ReviewExecutionMode, ReviewHandoffContext, - ReviewHandoffWritebackFailed, ReviewPolicyStopReason, ReviewPolicyStopRequested, - RunCompletionDisposition, TrackerToolBridge, + ISSUE_DELIVERY_CLOSEOUT_COMPLETE_TOOL_NAME, ISSUE_LABEL_ADD_TOOL_NAME, + ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, + ISSUE_REVIEW_HANDOFF_TOOL_NAME, ISSUE_REVIEW_REPAIR_COMPLETE_TOOL_NAME, + ISSUE_TERMINAL_FINALIZE_TOOL_NAME, ISSUE_TRANSITION_TOOL_NAME, ReviewExecutionMode, + ReviewHandoffContext, ReviewHandoffWritebackFailed, ReviewPolicyStopReason, + ReviewPolicyStopRequested, RunCompletionDisposition, TrackerToolBridge, }, }; diff --git a/apps/decodex/src/agent/tracker_tool_bridge/tools.rs b/apps/decodex/src/agent/tracker_tool_bridge/tools.rs index 0c90d2c..9cc2bb6 100644 --- a/apps/decodex/src/agent/tracker_tool_bridge/tools.rs +++ b/apps/decodex/src/agent/tracker_tool_bridge/tools.rs @@ -95,10 +95,10 @@ impl<'a> TrackerToolBridge<'a> { pub(super) fn comment_tool_specs(&self) -> Vec { vec![DynamicToolSpec::new( ISSUE_COMMENT_TOOL_NAME, - "Add a comment to the currently leased issue.", + "Add an exceptional human-readable comment to the currently leased issue for manual-attention blockers or explicit collaboration notes. Use progress checkpoints for routine progress.", serde_json::json!({ - "type": "object", - "properties": { + "type": "object", + "properties": { "issue_id": { "type": "string" }, "issue_identifier": { "type": "string" }, "body": { "type": "string" } diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index c0d0b2a..4ad67b5 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -18,7 +18,7 @@ use crate::{ agent, archive_hygiene::{self, ArchiveHygieneRequest}, manual::{self, ManualCommitRequest, ManualLandRequest}, - orchestrator::{self, IssueDispatchMode, RunOnceRequest, ServeRequest}, + orchestrator::{self, DiagnoseRequest, IssueDispatchMode, RunOnceRequest, ServeRequest}, prelude::eyre, recovery::{self, ReviewHandoffDiagnoseRequest, ReviewHandoffRebindRequest}, runtime, @@ -58,6 +58,7 @@ impl Cli { Command::Serve(args) => args.run(config_path), Command::Project(args) => args.run(), Command::Status(args) => args.run(config_path), + Command::Diagnose(args) => args.run(config_path), Command::Recover(args) => args.run(config_path), Command::ArchiveLinear(args) => args.run(config_path), Command::Probe(args) => args.run(), @@ -128,6 +129,8 @@ enum Command { Project(ProjectCommand), /// Inspect the current local runtime state for one configured project. Status(StatusCommand), + /// Write and print the agent-readable local evidence index. + Diagnose(DiagnoseCommand), /// Diagnose or explicitly repair supported retained-lane recovery cases. Recover(RecoverCommand), /// Dry-run or archive old terminal Linear issues by repo label. @@ -354,6 +357,25 @@ impl StatusCommand { } } +#[derive(Debug, Args)] +struct DiagnoseCommand { + /// Emit the agent handoff index JSON instead of a one-line path summary. + #[arg(long)] + json: bool, + /// Maximum number of recent runs to include while generating evidence. + #[arg(long, value_name = "COUNT", default_value_t = orchestrator::DEFAULT_STATUS_RUN_LIMIT)] + limit: usize, +} +impl DiagnoseCommand { + fn run(&self, config_path: Option<&Path>) -> crate::prelude::Result<()> { + orchestrator::run_diagnose(DiagnoseRequest { + config_path, + json: self.json, + limit: self.limit, + }) + } +} + #[derive(Debug, Args)] struct RecoverCommand { #[command(subcommand)] @@ -571,10 +593,10 @@ mod tests { use clap::Parser; use crate::cli::{ - AttemptCommand, Cli, Command, CommitCommand, LandCommand, ProbeCommand, ProjectCommand, - ProjectSubcommand, RecoverCommand, RecoverSubcommand, ReviewHandoffDiagnoseCommand, - ReviewHandoffRebindCommand, ReviewHandoffRecoveryCommand, ReviewHandoffRecoverySubcommand, - RunCommand, ServeCommand, StatusCommand, + AttemptCommand, Cli, Command, CommitCommand, DiagnoseCommand, LandCommand, ProbeCommand, + ProjectCommand, ProjectSubcommand, RecoverCommand, RecoverSubcommand, + ReviewHandoffDiagnoseCommand, ReviewHandoffRebindCommand, ReviewHandoffRecoveryCommand, + ReviewHandoffRecoverySubcommand, RunCommand, ServeCommand, StatusCommand, }; #[test] @@ -751,6 +773,22 @@ mod tests { assert!(matches!(cli.command, Command::Status(StatusCommand { json: true, limit: 5 }))); } + #[test] + fn parses_diagnose_with_json_limit_and_global_config() { + let cli = Cli::parse_from([ + "decodex", + "--config", + "./project.toml", + "diagnose", + "--json", + "--limit", + "5", + ]); + + assert_eq!(cli.config, Some(PathBuf::from("./project.toml"))); + assert!(matches!(cli.command, Command::Diagnose(DiagnoseCommand { json: true, limit: 5 }))); + } + #[test] fn parses_review_handoff_diagnose_with_issue_and_json() { let cli = Cli::parse_from([ diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index aeb81f1..893ac5f 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -5,11 +5,11 @@ use std::{ env, error::Error, fmt::{self, Display, Formatter}, - fs::{self, File}, + fs::{self, File, OpenOptions}, io::{ErrorKind, Read, Write}, net::{SocketAddr, TcpListener, TcpStream}, path::{Path, PathBuf}, - process::{Child, Command, ExitStatus, Stdio}, + process::{self, Child, Command, ExitStatus, Stdio}, slice, sync::{ Arc, Mutex, @@ -27,7 +27,7 @@ use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use crate::{agent, default_branch_sync, git_credentials, state}; #[rustfmt::skip] -use crate::{agent::{ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure, AppServerHomePreflightFailure, AppServerProcessEnv, AppServerRunRequest, AppServerRunResult, AppServerTransportFailure, AppServerTurnFailure, ISSUE_COMMENT_TOOL_NAME, ISSUE_DELIVERY_CLOSEOUT_COMPLETE_TOOL_NAME, ISSUE_LABEL_ADD_TOOL_NAME, ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_HANDOFF_TOOL_NAME, ISSUE_REVIEW_REPAIR_COMPLETE_TOOL_NAME, ISSUE_TERMINAL_FINALIZE_TOOL_NAME, ISSUE_TRANSITION_TOOL_NAME, DecodexRunContext, DecodexToolBridge, ReviewExecutionMode, ReviewHandoffContext, ReviewHandoffWritebackFailed, ReviewPolicyStopReason, ReviewPolicyStopRequested, RunCompletionDisposition, TrackerToolBridge, TurnContinuationGuard}, config::{InternalReviewMode, ServiceConfig}, git_credentials::GitCredentialSource, github, prelude::{Result, eyre}, state::{ChildAgentActivityBucket, ChildAgentActivitySummary, CodexAccountActivitySummary, ProjectRegistration, ProjectRunStatus, ProtocolActivitySummary, RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, RUN_OPERATION_GIT_CREDENTIALS, RUN_OPERATION_IDLE, RUN_OPERATION_RECONCILIATION, RUN_OPERATION_REPO_GATE, RUN_OPERATION_REVIEW_WRITEBACK, RUN_OPERATION_WAITING_EXTERNAL, ReviewHandoffMarker, ReviewOrchestrationMarker, RunActivityMarker, RunAttempt, StateStore, WorktreeMapping}, tracker::{IssueTracker, TrackerComment, TrackerIssue, linear::LinearClient, records}, workflow::{WorkflowDocument, WorkflowExecution}, worktree::{WorktreeManager, WorktreeSpec}}; +use crate::{agent::{ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure, AppServerHomePreflightFailure, AppServerProcessEnv, AppServerRunRequest, AppServerRunResult, AppServerTransportFailure, AppServerTurnFailure, ISSUE_DELIVERY_CLOSEOUT_COMPLETE_TOOL_NAME, ISSUE_LABEL_ADD_TOOL_NAME, ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_HANDOFF_TOOL_NAME, ISSUE_REVIEW_REPAIR_COMPLETE_TOOL_NAME, ISSUE_TERMINAL_FINALIZE_TOOL_NAME, ISSUE_TRANSITION_TOOL_NAME, DecodexRunContext, DecodexToolBridge, ReviewExecutionMode, ReviewHandoffContext, ReviewHandoffWritebackFailed, ReviewPolicyStopReason, ReviewPolicyStopRequested, RunCompletionDisposition, TrackerToolBridge, TurnContinuationGuard}, config::{InternalReviewMode, ServiceConfig}, git_credentials::GitCredentialSource, github, prelude::{Result, eyre}, state::{ChildAgentActivityBucket, ChildAgentActivitySummary, CodexAccountActivitySummary, ProjectRegistration, ProjectRunStatus, ProtocolActivitySummary, RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, RUN_OPERATION_GIT_CREDENTIALS, RUN_OPERATION_IDLE, RUN_OPERATION_RECONCILIATION, RUN_OPERATION_REPO_GATE, RUN_OPERATION_REVIEW_WRITEBACK, RUN_OPERATION_WAITING_EXTERNAL, ReviewHandoffMarker, ReviewOrchestrationMarker, RunActivityMarker, RunAttempt, StateStore, WorktreeMapping}, tracker::{IssueTracker, TrackerComment, TrackerIssue, linear::LinearClient, records}, workflow::{WorkflowDocument, WorkflowExecution}, worktree::{WorktreeManager, WorktreeSpec}}; include!("orchestrator/types.rs"); @@ -57,6 +57,8 @@ include!("orchestrator/status.rs"); include!("orchestrator/selection.rs"); +include!("orchestrator/agent_evidence.rs"); + pub(crate) const DEFAULT_STATUS_RUN_LIMIT: usize = 10; pub(crate) const DEFAULT_OPERATOR_DASHBOARD_RUN_LIMIT: usize = 25; pub(crate) const EXTERNAL_REVIEW_ACTOR_LOGIN: &str = "codex"; diff --git a/apps/decodex/src/orchestrator/agent_evidence.rs b/apps/decodex/src/orchestrator/agent_evidence.rs new file mode 100644 index 0000000..423e0ef --- /dev/null +++ b/apps/decodex/src/orchestrator/agent_evidence.rs @@ -0,0 +1,1230 @@ +use std::collections::{self, BTreeMap}; + +const AGENT_HANDOFF_INDEX_SCHEMA: &str = "decodex.agent_handoff_index/1"; +const AGENT_BLOCKER_SNAPSHOT_SCHEMA: &str = "decodex.blocker_snapshot/1"; +const AGENT_RUN_CAPSULE_SCHEMA: &str = "decodex.run_capsule/1"; +const AGENT_EVIDENCE_EVENT_SCHEMA: &str = "decodex.agent_evidence_event/1"; +const HANDOFF_INDEX_FILE_NAME: &str = "handoff-index.json"; +const BLOCKERS_DIR_NAME: &str = "blockers"; +const RUNS_DIR_NAME: &str = "runs"; +const EVENTS_FILE_NAME: &str = "events.jsonl"; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum AgentEvidenceSource { + DiagnoseCommand, + ServeTick, +} +impl AgentEvidenceSource { + fn as_str(self) -> &'static str { + match self { + Self::DiagnoseCommand => "diagnose_command", + Self::ServeTick => "serve_tick", + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentEvidenceWriteResult { + project_id: String, + handoff_index_path: String, + handoff_index: AgentHandoffIndex, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentHandoffIndex { + schema: &'static str, + project_id: String, + generated_at: String, + source: String, + evidence_root: String, + handoff_index_path: String, + blockers_dir: String, + runs_dir: String, + events_path: String, + summary: AgentEvidenceSummary, + warnings: Vec, + connector_backoffs: Vec, + blockers: Vec, + run_capsules: Vec, + recovery_worktrees: Vec, + recovery_contracts: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentEvidenceSummary { + project_count: usize, + active_run_count: usize, + recent_run_count: usize, + history_lane_count: usize, + queued_candidate_count: usize, + post_review_lane_count: usize, + recovery_worktree_count: usize, + blocker_count: usize, + run_capsule_count: usize, + connector_backoff_count: usize, + warning_count: usize, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentConnectorBackoff { + evidence_ref: String, + connector: String, + sync_phase: String, + quota_class: String, + reset_at: String, + reset_unix_epoch: i64, + reset_source: String, + retry_after_seconds: i64, + warning: String, + next_action: String, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentBlocker { + evidence_ref: String, + project_id: String, + surface: String, + issue_id: Option, + issue_identifier: Option, + run_id: Option, + attempt_number: Option, + classification: String, + reason_code: String, + reason: String, + next_action: String, + blocker_snapshot_path: String, + related_run_capsule_path: Option, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentBlockerSnapshot { + schema: &'static str, + project_id: String, + generated_at: String, + issue_id: Option, + issue_identifier: Option, + blockers: Vec, + related_run_capsules: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentRunCapsuleRef { + evidence_ref: String, + run_id: String, + issue_id: String, + issue_identifier: Option, + attempt_number: i64, + status: String, + phase: String, + current_operation: String, + path: String, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentRunCapsule { + schema: &'static str, + evidence_ref: String, + project_id: String, + generated_at: String, + path: String, + run_id: String, + issue_id: String, + issue_identifier: Option, + title: Option, + attempt_number: i64, + status: String, + attempt_status: String, + phase: String, + wait_reason: Option, + current_operation: String, + queue_lease_state: String, + execution_liveness: String, + active_lease: bool, + continuation_pending: bool, + suspected_stall: bool, + thread_id: Option, + turn_id: Option, + thread_status: Option, + thread_active_flags: Vec, + interactive_requested: bool, + process_id: Option, + process_alive: Option, + event_count: i64, + last_event_type: Option, + last_event_at: Option, + last_run_activity_at: Option, + last_protocol_activity_at: Option, + last_progress_at: Option, + idle_for_seconds: Option, + protocol_idle_for_seconds: Option, + retry_kind: Option, + next_retry_at: Option, + effective_model: Option, + effective_model_provider: Option, + effective_cwd: Option, + effective_approval_policy: Option, + effective_approvals_reviewer: Option, + effective_sandbox_mode: Option, + branch_name: Option, + worktree_path: Option, + ledger_outcome: Option, + diagnosis: AgentRunDiagnosis, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentRunLedgerOutcome { + ledger_status: String, + final_outcome: String, + final_event_type: Option, + final_event_at: Option, + summary: Option, + pr_url: Option, + commit_sha: Option, + closeout_status: Option, + needs_attention_reason: Option, + record_count: usize, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentRunDiagnosis { + attention_required: bool, + reason_code: Option, + next_action: Option, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentRecoveryWorktree { + issue_id: String, + issue_identifier: Option, + issue_state: Option, + branch_name: String, + worktree_path: String, + role: String, + ownership: String, + ownership_reason: String, + hygiene_classification: Option, + hygiene_reason: Option, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentRecoveryContract { + evidence_ref: String, + kind: String, + issue_identifier: Option, + reason_code: String, + command: Option, + next_action: String, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct AgentEvidenceEvent { + schema: &'static str, + project_id: String, + generated_at: String, + source: String, + handoff_index_path: String, + blocker_count: usize, + run_capsule_count: usize, + warning_count: usize, + connector_backoff_count: usize, +} + +struct AgentEvidenceFileWriteContext<'a> { + project_id: &'a str, + generated_at: &'a str, + source: AgentEvidenceSource, + handoff_index_path: &'a Path, + blockers_dir: &'a Path, + events_path: &'a Path, +} + +struct AgentEvidenceProjectView<'a> { + project_id: &'a str, + warnings: Vec, + projects: Vec<&'a OperatorProjectStatus>, + connector_backoffs: Vec<&'a OperatorConnectorBackoffStatus>, + active_runs: Vec<&'a OperatorRunStatus>, + recent_runs: Vec<&'a OperatorRunStatus>, + history_lanes: Vec<&'a OperatorHistoryLaneStatus>, + queued_candidates: Vec<&'a OperatorQueuedIssueStatus>, + recovery_worktrees: Vec<(&'a str, &'a OperatorWorktreeStatus)>, + post_review_lanes: Vec<&'a OperatorPostReviewLaneStatus>, +} +impl<'a> AgentEvidenceProjectView<'a> { + fn from_snapshot(snapshot: &'a OperatorStatusSnapshot, project_id: &'a str) -> Self { + let single_project_snapshot = snapshot.project_id == project_id; + let projects = snapshot + .projects + .iter() + .filter(|project| project.project_id == project_id) + .collect::>(); + let connector_backoffs = snapshot + .connector_backoffs + .iter() + .filter(|backoff| backoff.project_id == project_id) + .collect::>(); + let active_runs = snapshot + .active_runs + .iter() + .filter(|run| run.project_id == project_id) + .collect::>(); + let recent_runs = snapshot + .recent_runs + .iter() + .filter(|run| run.project_id == project_id) + .collect::>(); + let history_lanes = snapshot + .history_lanes + .iter() + .filter(|lane| lane.project_id == project_id) + .collect::>(); + let post_review_lanes = snapshot + .post_review_lanes + .iter() + .filter(|lane| lane_issue_belongs_to_project(lane.issue_id.as_str(), project_id, snapshot)) + .collect::>(); + let queued_candidates = snapshot + .queued_candidates + .iter() + .filter(|candidate| { + lane_issue_belongs_to_project(candidate.issue_id.as_str(), project_id, snapshot) + }) + .collect::>(); + let recovery_worktrees = if single_project_snapshot { + rendered_recovery_worktrees(snapshot) + } else { + rendered_recovery_worktrees(snapshot) + .into_iter() + .filter(|(_, worktree)| { + lane_issue_belongs_to_project(worktree.issue_id.as_str(), project_id, snapshot) + }) + .collect() + }; + + Self { + project_id, + warnings: snapshot.warnings.clone(), + projects, + connector_backoffs, + active_runs, + recent_runs, + history_lanes, + queued_candidates, + recovery_worktrees, + post_review_lanes, + } + } +} + +fn write_agent_evidence_snapshot( + snapshot: &OperatorStatusSnapshot, + source: AgentEvidenceSource, +) -> Result> { + let generated_at = current_timestamp(); + let month_bucket = current_month_bucket(); + let mut results = Vec::new(); + + for project_id in agent_evidence_project_ids(snapshot) { + let service_root = runtime::agent_evidence_dir()?.join(&project_id); + let handoff_index_path = service_root.join(HANDOFF_INDEX_FILE_NAME); + let blockers_dir = service_root.join(BLOCKERS_DIR_NAME); + let runs_dir = service_root.join(RUNS_DIR_NAME); + let events_path = service_root.join(EVENTS_FILE_NAME); + let project_view = AgentEvidenceProjectView::from_snapshot(snapshot, &project_id); + let mut run_capsules = build_run_capsules( + &project_view, + &generated_at, + &runs_dir, + &month_bucket, + ); + + run_capsules.sort_by(|left, right| { + left + .issue_identifier + .cmp(&right.issue_identifier) + .then_with(|| left.issue_id.cmp(&right.issue_id)) + .then_with(|| left.attempt_number.cmp(&right.attempt_number)) + .then_with(|| left.run_id.cmp(&right.run_id)) + }); + + let run_refs = run_capsules + .iter() + .map(run_capsule_ref) + .collect::>(); + let blockers = build_agent_blockers(&project_view, &blockers_dir, &run_refs); + let recovery_worktrees = project_view + .recovery_worktrees + .iter() + .map(|(role, worktree)| agent_recovery_worktree(role, worktree)) + .collect::>(); + let recovery_contracts = blockers.iter().filter_map(agent_recovery_contract).collect(); + let connector_backoffs = project_view + .connector_backoffs + .iter() + .copied() + .map(agent_connector_backoff) + .collect::>(); + let summary = AgentEvidenceSummary { + project_count: project_view.projects.len(), + active_run_count: project_view.active_runs.len(), + recent_run_count: project_view.recent_runs.len(), + history_lane_count: project_view.history_lanes.len(), + queued_candidate_count: project_view.queued_candidates.len(), + post_review_lane_count: project_view.post_review_lanes.len(), + recovery_worktree_count: recovery_worktrees.len(), + blocker_count: blockers.len(), + run_capsule_count: run_refs.len(), + connector_backoff_count: connector_backoffs.len(), + warning_count: project_view.warnings.len(), + }; + let index = AgentHandoffIndex { + schema: AGENT_HANDOFF_INDEX_SCHEMA, + project_id: project_id.clone(), + generated_at: generated_at.clone(), + source: source.as_str().to_owned(), + evidence_root: service_root.display().to_string(), + handoff_index_path: handoff_index_path.display().to_string(), + blockers_dir: blockers_dir.display().to_string(), + runs_dir: runs_dir.display().to_string(), + events_path: events_path.display().to_string(), + summary, + warnings: project_view.warnings.clone(), + connector_backoffs, + blockers, + run_capsules: run_refs, + recovery_worktrees, + recovery_contracts, + }; + let write_context = AgentEvidenceFileWriteContext { + project_id: &project_id, + generated_at: &generated_at, + source, + handoff_index_path: &handoff_index_path, + blockers_dir: &blockers_dir, + events_path: &events_path, + }; + + write_agent_evidence_files(&write_context, &index, &run_capsules)?; + + results.push(AgentEvidenceWriteResult { + project_id, + handoff_index_path: handoff_index_path.display().to_string(), + handoff_index: index, + }); + } + + Ok(results) +} + +fn write_agent_evidence_best_effort( + snapshot: &OperatorStatusSnapshot, + source: AgentEvidenceSource, +) { + if let Err(error) = write_agent_evidence_snapshot(snapshot, source) { + let _ = error; + + tracing::warn!( + "Agent evidence write failed; sensitive runtime details were withheld from logs." + ); + } +} + +fn render_agent_evidence_write_result(result: &AgentEvidenceWriteResult) -> String { + format!( + "agent evidence written: project={} blockers={} run_capsules={} warnings={} index={}\n", + result.project_id, + result.handoff_index.summary.blocker_count, + result.handoff_index.summary.run_capsule_count, + result.handoff_index.summary.warning_count, + result.handoff_index_path, + ) +} + +fn lane_issue_belongs_to_project( + issue_id: &str, + project_id: &str, + snapshot: &OperatorStatusSnapshot, +) -> bool { + snapshot + .active_runs + .iter() + .chain(snapshot.recent_runs.iter()) + .any(|run| run.project_id == project_id && run.issue_id == issue_id) + || snapshot + .history_lanes + .iter() + .any(|lane| lane.project_id == project_id && lane.issue_id == issue_id) + || snapshot.project_id == project_id +} + +fn agent_evidence_project_ids(snapshot: &OperatorStatusSnapshot) -> Vec { + let mut project_ids = collections::BTreeSet::new(); + + for project in &snapshot.projects { + project_ids.insert(project.project_id.clone()); + } + for run in snapshot.active_runs.iter().chain(snapshot.recent_runs.iter()) { + project_ids.insert(run.project_id.clone()); + } + for lane in &snapshot.history_lanes { + project_ids.insert(lane.project_id.clone()); + } + for backoff in &snapshot.connector_backoffs { + project_ids.insert(backoff.project_id.clone()); + } + + if project_ids.is_empty() && snapshot.project_id != "all" { + project_ids.insert(snapshot.project_id.clone()); + } + + project_ids.into_iter().collect() +} + +fn build_run_capsules( + project_view: &AgentEvidenceProjectView<'_>, + generated_at: &str, + runs_dir: &Path, + month_bucket: &str, +) -> Vec { + let mut run_ids = collections::BTreeSet::new(); + let mut capsules = Vec::new(); + + for run in project_view + .active_runs + .iter() + .chain(project_view.recent_runs.iter()) + .copied() + { + if run_ids.insert(run.run_id.clone()) { + capsules.push(agent_run_capsule( + project_view.project_id, + generated_at, + runs_dir, + month_bucket, + run, + ledger_outcome_for_run(run, project_view), + )); + } + } + for lane in &project_view.history_lanes { + for run in &lane.attempts { + if run_ids.insert(run.run_id.clone()) { + capsules.push(agent_run_capsule( + project_view.project_id, + generated_at, + runs_dir, + month_bucket, + run, + Some(agent_run_ledger_outcome(&lane.ledger_outcome)), + )); + } + } + } + + capsules +} + +fn ledger_outcome_for_run( + run: &OperatorRunStatus, + project_view: &AgentEvidenceProjectView<'_>, +) -> Option { + project_view + .history_lanes + .iter() + .find(|lane| lane.attempts.iter().any(|attempt| attempt.run_id == run.run_id)) + .map(|lane| agent_run_ledger_outcome(&lane.ledger_outcome)) +} + +fn agent_run_ledger_outcome( + outcome: &OperatorHistoryLedgerOutcome, +) -> AgentRunLedgerOutcome { + AgentRunLedgerOutcome { + ledger_status: outcome.ledger_status.clone(), + final_outcome: outcome.final_outcome.clone(), + final_event_type: outcome.final_event_type.clone(), + final_event_at: outcome.final_event_at.clone(), + summary: outcome.summary.clone(), + pr_url: outcome.pr_url.clone(), + commit_sha: outcome.commit_sha.clone(), + closeout_status: outcome.closeout_status.clone(), + needs_attention_reason: outcome.needs_attention_reason.clone(), + record_count: outcome.record_count, + } +} + +fn agent_run_capsule( + project_id: &str, + generated_at: &str, + runs_dir: &Path, + month_bucket: &str, + run: &OperatorRunStatus, + ledger_outcome: Option, +) -> AgentRunCapsule { + let path = run_capsule_path(runs_dir, month_bucket, &run.run_id); + let diagnosis = agent_run_diagnosis(run); + + AgentRunCapsule { + schema: AGENT_RUN_CAPSULE_SCHEMA, + evidence_ref: run_evidence_ref(project_id, &run.run_id), + project_id: project_id.to_owned(), + generated_at: generated_at.to_owned(), + path: path.display().to_string(), + run_id: run.run_id.clone(), + issue_id: run.issue_id.clone(), + issue_identifier: run.issue_identifier.clone(), + title: run.title.clone(), + attempt_number: run.attempt_number, + status: run.status.clone(), + attempt_status: run.attempt_status.clone(), + phase: run.phase.clone(), + wait_reason: run.wait_reason.clone(), + current_operation: run.current_operation.clone(), + queue_lease_state: run.queue_lease_state.clone(), + execution_liveness: run.execution_liveness.clone(), + active_lease: run.active_lease, + continuation_pending: run.continuation_pending, + suspected_stall: run.suspected_stall, + thread_id: run.thread_id.clone(), + turn_id: run.turn_id.clone(), + thread_status: run.thread_status.clone(), + thread_active_flags: run.thread_active_flags.clone(), + interactive_requested: run.interactive_requested, + process_id: run.process_id, + process_alive: run.process_alive, + event_count: run.event_count, + last_event_type: run.last_event_type.clone(), + last_event_at: run.last_event_at.clone(), + last_run_activity_at: run.last_run_activity_at.clone(), + last_protocol_activity_at: run.last_protocol_activity_at.clone(), + last_progress_at: run.last_progress_at.clone(), + idle_for_seconds: run.idle_for_seconds, + protocol_idle_for_seconds: run.protocol_idle_for_seconds, + retry_kind: run.retry_kind.clone(), + next_retry_at: run.next_retry_at.clone(), + effective_model: run.effective_model.clone(), + effective_model_provider: run.effective_model_provider.clone(), + effective_cwd: run.effective_cwd.clone(), + effective_approval_policy: run.effective_approval_policy.clone(), + effective_approvals_reviewer: run.effective_approvals_reviewer.clone(), + effective_sandbox_mode: run.effective_sandbox_mode.clone(), + branch_name: run.branch_name.clone(), + worktree_path: run.worktree_path.clone(), + ledger_outcome, + diagnosis, + } +} + +fn run_capsule_ref(capsule: &AgentRunCapsule) -> AgentRunCapsuleRef { + AgentRunCapsuleRef { + evidence_ref: capsule.evidence_ref.clone(), + run_id: capsule.run_id.clone(), + issue_id: capsule.issue_id.clone(), + issue_identifier: capsule.issue_identifier.clone(), + attempt_number: capsule.attempt_number, + status: capsule.status.clone(), + phase: capsule.phase.clone(), + current_operation: capsule.current_operation.clone(), + path: capsule.path.clone(), + } +} + +fn agent_run_diagnosis(run: &OperatorRunStatus) -> AgentRunDiagnosis { + let reason = agent_run_blocker_reason(run); + + AgentRunDiagnosis { + attention_required: reason.is_some(), + reason_code: reason.map(str::to_owned), + next_action: agent_run_next_action(run).map(str::to_owned), + } +} + +fn agent_run_blocker_reason(run: &OperatorRunStatus) -> Option<&'static str> { + if run.suspected_stall { + return Some("suspected_stall"); + } + if run.phase == "stalled" { + return Some("run_stalled"); + } + if run.process_alive == Some(false) && matches!(run.status.as_str(), "starting" | "running") { + return Some("process_exited_without_terminal_status"); + } + if operator_run_has_stale_execution_without_known_process(run) { + return Some("stale_execution_without_known_process"); + } + if run.wait_reason.is_some() { + return Some("run_waiting"); + } + if run.next_retry_at.is_some() { + return Some("retry_backoff"); + } + + None +} + +fn agent_run_next_action(run: &OperatorRunStatus) -> Option<&'static str> { + match agent_run_blocker_reason(run) { + Some("suspected_stall" | "run_stalled" | "stale_execution_without_known_process") => + Some("Inspect the run capsule, retained worktree, protocol activity, and process state before retrying."), + Some("process_exited_without_terminal_status") => + Some("Inspect the retained worktree and runtime markers; reconcile or retry only after preserving useful local changes."), + Some("run_waiting") => + Some("Inspect wait_reason, thread status, and protocol activity before deciding whether the agent can continue."), + Some("retry_backoff") => Some("Wait until next_retry_at or run an explicit operator retry after reviewing the retained state."), + _ => None, + } +} + +fn build_agent_blockers( + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, + run_refs: &[AgentRunCapsuleRef], +) -> Vec { + let mut blockers = Vec::new(); + + push_run_blockers(&mut blockers, project_view, blockers_dir, run_refs); + push_queued_candidate_blockers(&mut blockers, project_view, blockers_dir, run_refs); + push_post_review_lane_blockers(&mut blockers, project_view, blockers_dir); + push_recovery_worktree_blockers(&mut blockers, project_view, blockers_dir); + push_warning_blockers(&mut blockers, project_view, blockers_dir); + push_connector_backoff_blockers(&mut blockers, project_view, blockers_dir); + sort_agent_blockers(&mut blockers); + + blockers +} + +fn push_run_blockers( + blockers: &mut Vec, + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, + run_refs: &[AgentRunCapsuleRef], +) { + for run in &project_view.active_runs { + if let Some(reason_code) = agent_run_blocker_reason(run) { + let issue_key = issue_key(run.issue_identifier.as_deref(), &run.issue_id); + + blockers.push(AgentBlocker { + evidence_ref: blocker_evidence_ref(project_view.project_id, &issue_key, reason_code), + project_id: project_view.project_id.to_owned(), + surface: String::from("running_lane"), + issue_id: Some(run.issue_id.clone()), + issue_identifier: run.issue_identifier.clone(), + run_id: Some(run.run_id.clone()), + attempt_number: Some(run.attempt_number), + classification: String::from("attention_required"), + reason_code: reason_code.to_owned(), + reason: run + .wait_reason + .clone() + .unwrap_or_else(|| reason_code.replace('_', " ")), + next_action: agent_run_next_action(run).unwrap_or("Inspect the run capsule.").to_owned(), + blocker_snapshot_path: blocker_snapshot_path(blockers_dir, &issue_key) + .display() + .to_string(), + related_run_capsule_path: run_refs + .iter() + .find(|run_ref| run_ref.run_id == run.run_id) + .map(|run_ref| run_ref.path.clone()), + }); + } + } +} + +fn push_queued_candidate_blockers( + blockers: &mut Vec, + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, + run_refs: &[AgentRunCapsuleRef], +) { + for candidate in &project_view.queued_candidates { + if candidate.classification != "blocked" && candidate.attention.is_none() { + continue; + } + + let issue_key = issue_key(Some(&candidate.issue_identifier), &candidate.issue_id); + let reason_code = candidate + .attention + .as_ref() + .and_then(|attention| attention.attention_error_class.as_deref()) + .unwrap_or(candidate.reason.as_str()); + + blockers.push(AgentBlocker { + evidence_ref: blocker_evidence_ref(project_view.project_id, &issue_key, reason_code), + project_id: project_view.project_id.to_owned(), + surface: String::from("intake_queue"), + issue_id: Some(candidate.issue_id.clone()), + issue_identifier: Some(candidate.issue_identifier.clone()), + run_id: candidate.attention.as_ref().and_then(|attention| attention.run_id.clone()), + attempt_number: candidate.attention.as_ref().and_then(|attention| attention.attempt_number), + classification: candidate.classification.clone(), + reason_code: reason_code.to_owned(), + reason: candidate + .attention + .as_ref() + .map(|attention| attention.summary.clone()) + .unwrap_or_else(|| candidate.reason.clone()), + next_action: candidate + .attention + .as_ref() + .and_then(|attention| attention.attention_next_action.clone()) + .unwrap_or_else(|| String::from("Inspect the queued candidate and retained worktree before retrying.")), + blocker_snapshot_path: blocker_snapshot_path(blockers_dir, &issue_key) + .display() + .to_string(), + related_run_capsule_path: candidate + .attention + .as_ref() + .and_then(|attention| attention.run_id.as_deref()) + .and_then(|run_id| run_refs.iter().find(|run_ref| run_ref.run_id == run_id)) + .map(|run_ref| run_ref.path.clone()), + }); + } +} + +fn push_post_review_lane_blockers( + blockers: &mut Vec, + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, +) { + for lane in &project_view.post_review_lanes { + if !post_review_lane_requires_attention(lane) { + continue; + } + + let issue_key = issue_key(Some(&lane.issue_identifier), &lane.issue_id); + + blockers.push(AgentBlocker { + evidence_ref: blocker_evidence_ref(project_view.project_id, &issue_key, &lane.reason), + project_id: project_view.project_id.to_owned(), + surface: String::from("review_landing"), + issue_id: Some(lane.issue_id.clone()), + issue_identifier: Some(lane.issue_identifier.clone()), + run_id: None, + attempt_number: None, + classification: lane.classification.clone(), + reason_code: lane.reason.clone(), + reason: lane.reason.clone(), + next_action: post_review_lane_next_action(lane, project_view.project_id), + blocker_snapshot_path: blocker_snapshot_path(blockers_dir, &issue_key) + .display() + .to_string(), + related_run_capsule_path: None, + }); + } +} + +fn push_recovery_worktree_blockers( + blockers: &mut Vec, + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, +) { + for (role, worktree) in &project_view.recovery_worktrees { + if worktree.hygiene.is_none() { + continue; + } + + let issue_key = issue_key(worktree.issue_identifier.as_deref(), &worktree.issue_id); + let reason_code = worktree + .hygiene + .as_ref() + .map(|hygiene| hygiene.classification.as_str()) + .unwrap_or(*role); + + blockers.push(AgentBlocker { + evidence_ref: blocker_evidence_ref(project_view.project_id, &issue_key, reason_code), + project_id: project_view.project_id.to_owned(), + surface: String::from("recovery_worktree"), + issue_id: Some(worktree.issue_id.clone()), + issue_identifier: worktree.issue_identifier.clone(), + run_id: None, + attempt_number: None, + classification: (*role).to_owned(), + reason_code: reason_code.to_owned(), + reason: worktree.ownership_reason.clone(), + next_action: String::from("Inspect the retained worktree before cleanup or recovery."), + blocker_snapshot_path: blocker_snapshot_path(blockers_dir, &issue_key) + .display() + .to_string(), + related_run_capsule_path: None, + }); + } +} + +fn push_warning_blockers( + blockers: &mut Vec, + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, +) { + for warning in &project_view.warnings { + if warning == "external_observer_status_skipped" { + continue; + } + + let issue_key = format!("project-{}", sanitize_evidence_path_component(warning)); + + blockers.push(AgentBlocker { + evidence_ref: blocker_evidence_ref(project_view.project_id, &issue_key, warning), + project_id: project_view.project_id.to_owned(), + surface: String::from("operator_snapshot"), + issue_id: None, + issue_identifier: None, + run_id: None, + attempt_number: None, + classification: String::from("snapshot_warning"), + reason_code: warning.clone(), + reason: warning.clone(), + next_action: String::from("Regenerate diagnose output after resolving the unavailable observer or runtime warning."), + blocker_snapshot_path: blocker_snapshot_path(blockers_dir, &issue_key) + .display() + .to_string(), + related_run_capsule_path: None, + }); + } +} + +fn push_connector_backoff_blockers( + blockers: &mut Vec, + project_view: &AgentEvidenceProjectView<'_>, + blockers_dir: &Path, +) { + for backoff in &project_view.connector_backoffs { + let issue_key = format!("connector-{}", sanitize_evidence_path_component(&backoff.connector)); + + blockers.push(AgentBlocker { + evidence_ref: blocker_evidence_ref( + project_view.project_id, + &issue_key, + &backoff.warning, + ), + project_id: project_view.project_id.to_owned(), + surface: String::from("connector_backoff"), + issue_id: None, + issue_identifier: None, + run_id: None, + attempt_number: None, + classification: String::from("backoff"), + reason_code: backoff.warning.clone(), + reason: format!("{} {}", backoff.connector, backoff.sync_phase), + next_action: backoff.next_action.clone(), + blocker_snapshot_path: blocker_snapshot_path(blockers_dir, &issue_key) + .display() + .to_string(), + related_run_capsule_path: None, + }); + } +} + +fn sort_agent_blockers(blockers: &mut [AgentBlocker]) { + blockers.sort_by(|left, right| { + left + .issue_identifier + .cmp(&right.issue_identifier) + .then_with(|| left.issue_id.cmp(&right.issue_id)) + .then_with(|| left.surface.cmp(&right.surface)) + .then_with(|| left.reason_code.cmp(&right.reason_code)) + }); +} + +fn post_review_lane_requires_attention(lane: &OperatorPostReviewLaneStatus) -> bool { + matches!( + lane.classification.as_str(), + "blocked" | "needs_review_repair" | "closeout_blocked" | "cleanup_blocked" + ) || lane.reason == "missing_review_handoff_record" +} + +fn post_review_lane_next_action( + lane: &OperatorPostReviewLaneStatus, + project_id: &str, +) -> String { + if lane.reason == "missing_review_handoff_record" { + return format!( + "Run `decodex recover review-handoff diagnose {} --json`; rebind only after PR lineage and retained worktree HEAD match.", + lane.issue_identifier + ); + } + if lane.classification == "needs_review_repair" { + return String::from("Run or inspect the retained review-repair lane before attempting land."); + } + + format!( + "Inspect the `{}` retained post-review lane for service `{project_id}` before retrying.", + lane.classification + ) +} + +fn agent_connector_backoff(backoff: &OperatorConnectorBackoffStatus) -> AgentConnectorBackoff { + AgentConnectorBackoff { + evidence_ref: format!( + "connector:{}/{}:{}", + backoff.project_id, backoff.connector, backoff.sync_phase + ), + connector: backoff.connector.clone(), + sync_phase: backoff.sync_phase.clone(), + quota_class: backoff.quota_class.clone(), + reset_at: backoff.reset_at.clone(), + reset_unix_epoch: backoff.reset_unix_epoch, + reset_source: backoff.reset_source.clone(), + retry_after_seconds: backoff.retry_after_seconds, + warning: backoff.warning.clone(), + next_action: backoff.next_action.clone(), + } +} + +fn agent_recovery_worktree( + role: &str, + worktree: &OperatorWorktreeStatus, +) -> AgentRecoveryWorktree { + AgentRecoveryWorktree { + issue_id: worktree.issue_id.clone(), + issue_identifier: worktree.issue_identifier.clone(), + issue_state: worktree.issue_state.clone(), + branch_name: worktree.branch_name.clone(), + worktree_path: worktree.worktree_path.clone(), + role: role.to_owned(), + ownership: worktree.ownership.clone(), + ownership_reason: worktree.ownership_reason.clone(), + hygiene_classification: worktree + .hygiene + .as_ref() + .map(|hygiene| hygiene.classification.clone()), + hygiene_reason: worktree.hygiene.as_ref().map(|hygiene| hygiene.reason.clone()), + } +} + +fn agent_recovery_contract(blocker: &AgentBlocker) -> Option { + let command = if blocker.reason_code == "missing_review_handoff_record" { + blocker + .issue_identifier + .as_ref() + .map(|issue| format!("decodex recover review-handoff diagnose {issue} --json")) + } else { + None + }; + + if command.is_none() && blocker.surface != "running_lane" && blocker.surface != "intake_queue" { + return None; + } + + Some(AgentRecoveryContract { + evidence_ref: blocker.evidence_ref.clone(), + kind: blocker.surface.clone(), + issue_identifier: blocker.issue_identifier.clone(), + reason_code: blocker.reason_code.clone(), + command, + next_action: blocker.next_action.clone(), + }) +} + +fn write_agent_evidence_files( + context: &AgentEvidenceFileWriteContext<'_>, + index: &AgentHandoffIndex, + run_capsules: &[AgentRunCapsule], +) -> Result<()> { + for capsule in run_capsules { + let path = PathBuf::from(&capsule.path); + + write_json_atomically(&path, capsule)?; + } + + write_blocker_snapshots( + context.project_id, + context.generated_at, + context.blockers_dir, + &index.blockers, + &index.run_capsules, + )?; + write_json_atomically(context.handoff_index_path, index)?; + append_agent_evidence_event( + context.project_id, + context.generated_at, + context.source, + context.events_path, + index, + )?; + + Ok(()) +} + +fn write_blocker_snapshots( + project_id: &str, + generated_at: &str, + blockers_dir: &Path, + blockers: &[AgentBlocker], + run_refs: &[AgentRunCapsuleRef], +) -> Result<()> { + fs::create_dir_all(blockers_dir)?; + + let mut blockers_by_path: BTreeMap> = BTreeMap::new(); + + for blocker in blockers { + blockers_by_path + .entry(blocker.blocker_snapshot_path.clone()) + .or_default() + .push(blocker.clone()); + } + + let mut kept_paths = collections::BTreeSet::new(); + + for (path, blockers) in blockers_by_path { + let path = PathBuf::from(path); + let related_run_capsules = blockers + .iter() + .filter_map(|blocker| blocker.run_id.as_deref()) + .filter_map(|run_id| run_refs.iter().find(|run_ref| run_ref.run_id == run_id)) + .cloned() + .collect::>(); + let snapshot = AgentBlockerSnapshot { + schema: AGENT_BLOCKER_SNAPSHOT_SCHEMA, + project_id: project_id.to_owned(), + generated_at: generated_at.to_owned(), + issue_id: blockers.iter().find_map(|blocker| blocker.issue_id.clone()), + issue_identifier: blockers + .iter() + .find_map(|blocker| blocker.issue_identifier.clone()), + blockers, + related_run_capsules, + }; + + write_json_atomically(&path, &snapshot)?; + + kept_paths.insert(path); + } + + prune_stale_json_files(blockers_dir, &kept_paths) +} + +fn append_agent_evidence_event( + project_id: &str, + generated_at: &str, + source: AgentEvidenceSource, + events_path: &Path, + index: &AgentHandoffIndex, +) -> Result<()> { + if let Some(parent) = events_path.parent() { + fs::create_dir_all(parent)?; + } + + let event = AgentEvidenceEvent { + schema: AGENT_EVIDENCE_EVENT_SCHEMA, + project_id: project_id.to_owned(), + generated_at: generated_at.to_owned(), + source: source.as_str().to_owned(), + handoff_index_path: index.handoff_index_path.clone(), + blocker_count: index.summary.blocker_count, + run_capsule_count: index.summary.run_capsule_count, + warning_count: index.summary.warning_count, + connector_backoff_count: index.summary.connector_backoff_count, + }; + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(events_path)?; + + writeln!(file, "{}", serde_json::to_string(&event)?)?; + + Ok(()) +} + +fn write_json_atomically(path: &Path, value: &T) -> Result<()> +where + T: Serialize, +{ + let Some(parent) = path.parent() else { + eyre::bail!("Agent evidence path `{}` has no parent directory.", path.display()); + }; + + fs::create_dir_all(parent)?; + + let file_name = path + .file_name() + .and_then(|name| name.to_str()) + .ok_or_else(|| eyre::eyre!("Agent evidence path `{}` has no UTF-8 file name.", path.display()))?; + let temp_path = parent.join(format!( + ".{file_name}.tmp-{}-{}", + process::id(), + OffsetDateTime::now_utc().unix_timestamp_nanos() + )); + let body = serde_json::to_vec_pretty(value)?; + + fs::write(&temp_path, body)?; + fs::rename(&temp_path, path)?; + + Ok(()) +} + +fn prune_stale_json_files( + dir: &Path, + keep_paths: &collections::BTreeSet, +) -> Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + if path.extension().and_then(|extension| extension.to_str()) != Some("json") { + continue; + } + if !keep_paths.contains(&path) { + fs::remove_file(path)?; + } + } + + Ok(()) +} + +fn issue_key(issue_identifier: Option<&str>, issue_id: &str) -> String { + issue_identifier.map_or_else( + || sanitize_evidence_path_component(issue_id), + sanitize_evidence_path_component, + ) +} + +fn blocker_snapshot_path(blockers_dir: &Path, issue_key: &str) -> PathBuf { + blockers_dir.join(format!("{issue_key}.json")) +} + +fn run_capsule_path(runs_dir: &Path, month_bucket: &str, run_id: &str) -> PathBuf { + runs_dir + .join(month_bucket) + .join(sanitize_evidence_path_component(run_id)) + .join("capsule.json") +} + +fn run_evidence_ref(project_id: &str, run_id: &str) -> String { + format!("run:{project_id}/{run_id}") +} + +fn blocker_evidence_ref(project_id: &str, issue_key: &str, reason_code: &str) -> String { + format!("blocker:{project_id}/{issue_key}/{reason_code}") +} + +fn sanitize_evidence_path_component(raw: &str) -> String { + let mut out = String::new(); + let mut previous_dash = false; + + for byte in raw.bytes() { + let character = byte as char; + + if character.is_ascii_alphanumeric() { + out.push(character.to_ascii_lowercase()); + + previous_dash = false; + } else if !previous_dash { + out.push('-'); + + previous_dash = true; + } + } + + let out = out.trim_matches('-').to_owned(); + + if out.is_empty() { + String::from("unknown") + } else { + out + } +} + +fn current_month_bucket() -> String { + let now = OffsetDateTime::now_utc(); + + format!("{:04}-{:02}", now.year(), u8::from(now.month())) +} diff --git a/apps/decodex/src/orchestrator/entrypoints.rs b/apps/decodex/src/orchestrator/entrypoints.rs index 2984b69..b1dc785 100644 --- a/apps/decodex/src/orchestrator/entrypoints.rs +++ b/apps/decodex/src/orchestrator/entrypoints.rs @@ -198,6 +198,131 @@ pub(crate) fn print_status( Ok(()) } +pub(crate) fn run_diagnose(request: DiagnoseRequest<'_>) -> Result<()> { + if request.limit == 0 { + eyre::bail!("`diagnose --limit` must be greater than zero."); + } + + let state_store = runtime::open_runtime_store()?; + let Some(config_path) = resolve_config_path(request.config_path, &state_store)? else { + eyre::bail!( + "No Decodex project config found. Pass --config or register one with `decodex project add `." + ); + }; + let config = ServiceConfig::from_path(&config_path)?; + let workflow = WorkflowDocument::from_path(config.workflow_path())?; + + runtime::register_project_config(&state_store, &config_path, true)?; + + let mut snapshot = match config.tracker().resolve_api_key().and_then(LinearClient::new) { + Ok(tracker) => build_diagnose_live_snapshot( + &tracker, + &config, + &workflow, + &state_store, + request.limit, + ), + Err(error) => { + let _ = error; + + tracing::warn!( + project_id = config.service_id(), + "Skipped live diagnose observer because tracker credentials were unavailable." + ); + + let mut snapshot = + build_operator_status_snapshot(&config, &state_store, request.limit)?; + + add_operator_snapshot_warning( + &mut snapshot, + "diagnose_tracker_observer_unavailable", + ); + + Ok(snapshot) + }, + }?; + + refresh_operator_project_summary(&mut snapshot); + + let results = write_agent_evidence_snapshot( + &snapshot, + AgentEvidenceSource::DiagnoseCommand, + )?; + let result = results + .into_iter() + .find(|result| result.project_id == config.service_id()) + .ok_or_else(|| { + eyre::eyre!( + "Agent evidence writer did not produce an index for project `{}`.", + config.service_id() + ) + })?; + + if request.json { + println!("{}", serde_json::to_string_pretty(&result.handoff_index)?); + } else { + println!("{}", render_agent_evidence_write_result(&result)); + } + + Ok(()) +} + +fn build_diagnose_live_snapshot( + tracker: &T, + config: &ServiceConfig, + workflow: &WorkflowDocument, + state_store: &StateStore, + limit: usize, +) -> Result +where + T: IssueTracker, +{ + let mut snapshot_warnings = Vec::new(); + + match recover_runtime_state_from_tracker_and_worktrees(tracker, config, workflow, state_store) { + Ok(recovered_state) => + hydrate_status_snapshot_state(config, state_store, recovered_state)?, + Err(error) => { + let _ = error; + + tracing::warn!( + project_id = config.service_id(), + "Skipped runtime recovery for diagnose; sensitive runtime details were withheld." + ); + + snapshot_warnings.push("diagnose_runtime_recovery_unavailable"); + }, + } + + let mut snapshot = match build_live_operator_status_snapshot( + tracker, + config, + workflow, + state_store, + limit, + ) { + Ok(snapshot) => snapshot, + Err(error) => { + let _ = error; + + tracing::warn!( + project_id = config.service_id(), + "Fell back to local diagnose snapshot; sensitive runtime details were withheld." + ); + + snapshot_warnings.push("diagnose_live_observer_unavailable"); + + build_operator_status_snapshot(config, state_store, limit)? + }, + }; + + for warning in snapshot_warnings { + add_operator_snapshot_warning(&mut snapshot, warning); + } + + Ok(snapshot) +} + fn run_control_plane_tick( state_store: &StateStore, project_runtimes: &mut HashMap, @@ -417,13 +542,17 @@ fn control_plane_project_local_snapshot( snapshot_warnings, &active_connector_backoff_statuses(project.service_id(), runtime), ) { - Ok(snapshot) => ControlPlaneProjectTick { - project_status: snapshot - .projects - .first() - .cloned() - .map(|status| complete_project_status(project, status)), - snapshot: Some(snapshot), + Ok(snapshot) => { + write_agent_evidence_best_effort(&snapshot, AgentEvidenceSource::ServeTick); + + ControlPlaneProjectTick { + project_status: snapshot + .projects + .first() + .cloned() + .map(|status| complete_project_status(project, status)), + snapshot: Some(snapshot), + } }, Err(error) => { let _ = error; @@ -510,6 +639,8 @@ fn control_plane_project_snapshot( Ok(snapshot) => { runtime.tracker_backoff = None; + write_agent_evidence_best_effort(&snapshot, AgentEvidenceSource::ServeTick); + ControlPlaneProjectTick { project_status: snapshot .projects diff --git a/apps/decodex/src/orchestrator/execution.rs b/apps/decodex/src/orchestrator/execution.rs index bed24c5..8c66062 100644 --- a/apps/decodex/src/orchestrator/execution.rs +++ b/apps/decodex/src/orchestrator/execution.rs @@ -214,6 +214,7 @@ where fn write_prepare_lifecycle_events( tracker: &T, project: &ServiceConfig, + workflow: &WorkflowDocument, state_store: &StateStore, issue_run: &IssueRunPlan, ) -> Result<()> @@ -226,15 +227,13 @@ where "Prepared worktree `{}` for issue `{}` did not expose a HEAD commit.", issue_run.worktree.path.display(), issue_run.issue.identifier - ) - })?; - - write_intake_lifecycle_event(tracker, project, state_store, issue_run, &worktree_path)?; - write_lease_lifecycle_event(tracker, project, state_store, issue_run, &worktree_path)?; + ) + })?; - write_worktree_prepared_lifecycle_event( + write_run_started_lifecycle_event( tracker, project, + workflow, state_store, issue_run, &worktree_path, @@ -242,62 +241,10 @@ where ) } -fn write_intake_lifecycle_event( - tracker: &T, - project: &ServiceConfig, - state_store: &StateStore, - issue_run: &IssueRunPlan, - worktree_path: &str, -) -> Result<()> -where - T: IssueTracker + ?Sized, -{ - let anchor = records::stable_event_anchor(&[issue_run.dispatch_mode.as_str(), "intake"]); - let mut record = records::LinearExecutionEventRecord::new( - lifecycle_event_identity(project, issue_run), - "intake", - current_timestamp(), - &anchor, - ); - - record.branch = Some(issue_run.worktree.branch_name.clone()); - record.worktree_path = Some(worktree_path.to_owned()); - record.summary = Some(format!( - "Decodex selected the issue for {} dispatch.", - issue_run.dispatch_mode.as_str() - )); - - write_lifecycle_event(tracker, state_store, &issue_run.issue.id, &record) -} - -fn write_lease_lifecycle_event( - tracker: &T, - project: &ServiceConfig, - state_store: &StateStore, - issue_run: &IssueRunPlan, - worktree_path: &str, -) -> Result<()> -where - T: IssueTracker + ?Sized, -{ - let anchor = records::stable_event_anchor(&[&issue_run.worktree.branch_name]); - let mut record = records::LinearExecutionEventRecord::new( - lifecycle_event_identity(project, issue_run), - "lease_acquired", - current_timestamp(), - &anchor, - ); - - record.branch = Some(issue_run.worktree.branch_name.clone()); - record.worktree_path = Some(worktree_path.to_owned()); - record.summary = Some(String::from("Decodex acquired the local lane lease.")); - - write_lifecycle_event(tracker, state_store, &issue_run.issue.id, &record) -} - -fn write_worktree_prepared_lifecycle_event( +fn write_run_started_lifecycle_event( tracker: &T, project: &ServiceConfig, + workflow: &WorkflowDocument, state_store: &StateStore, issue_run: &IssueRunPlan, worktree_path: &str, @@ -306,10 +253,16 @@ fn write_worktree_prepared_lifecycle_event( where T: IssueTracker + ?Sized, { - let anchor = records::stable_event_anchor(&[&issue_run.worktree.branch_name, commit_sha]); + let transport = workflow.frontmatter().agent().transport(); + let anchor = records::stable_event_anchor(&[ + issue_run.dispatch_mode.as_str(), + &issue_run.worktree.branch_name, + commit_sha, + transport, + ]); let mut record = records::LinearExecutionEventRecord::new( lifecycle_event_identity(project, issue_run), - "worktree_prepared", + "run_started", current_timestamp(), &anchor, ); @@ -317,34 +270,11 @@ where record.branch = Some(issue_run.worktree.branch_name.clone()); record.worktree_path = Some(worktree_path.to_owned()); record.commit_sha = Some(commit_sha.to_owned()); - record.summary = Some(String::from("Decodex prepared the lane worktree.")); - - write_lifecycle_event(tracker, state_store, &issue_run.issue.id, &record) -} - -fn write_agent_started_lifecycle_event( - tracker: &T, - project: &ServiceConfig, - state_store: &StateStore, - issue_run: &IssueRunPlan, - transport: &str, -) -> Result<()> -where - T: IssueTracker + ?Sized, -{ - let worktree_path = relative_worktree_path(project, &issue_run.worktree); - let anchor = records::stable_event_anchor(&[&issue_run.worktree.branch_name, transport]); - let mut record = records::LinearExecutionEventRecord::new( - lifecycle_event_identity(project, issue_run), - "agent_started", - current_timestamp(), - &anchor, - ); - - record.branch = Some(issue_run.worktree.branch_name.clone()); - record.worktree_path = Some(worktree_path); record.transport = Some(transport.to_owned()); - record.summary = Some(String::from("Decodex started the lane agent.")); + record.summary = Some(format!( + "Decodex started a {} run for this issue.", + issue_run.dispatch_mode.as_str() + )); write_lifecycle_event(tracker, state_store, &issue_run.issue.id, &record) } @@ -486,9 +416,6 @@ where }; let decodex_tool_bridge = DecodexToolBridge::new(&tracker_tool_bridge, build_decodex_run_context(workflow, issue_run)); - - write_agent_started_lifecycle_event(tracker, project, state_store, issue_run, &transport)?; - let run_result = agent::execute_app_server_run( &AppServerRunRequest { run_id: issue_run.run_id.clone(), diff --git a/apps/decodex/src/orchestrator/prompting.rs b/apps/decodex/src/orchestrator/prompting.rs index 1233d8b..8412448 100644 --- a/apps/decodex/src/orchestrator/prompting.rs +++ b/apps/decodex/src/orchestrator/prompting.rs @@ -99,21 +99,18 @@ where label_tool = ISSUE_LABEL_ADD_TOOL_NAME, continuation_guidance = continuation_guidance, ), - _ => format!( - "Tracker tool contract\n- You own issue-scoped tracker writes for `{issue}`.\n- At the start of execution, call `{transition_tool}` to move the issue to `{in_progress}` and add a brief `{comment_tool}` comment that you started work on run `{run_id}` attempt `{attempt}`.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- When the implementation is ready, commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If you determine the issue needs human attention, add label `{needs_attention}` with `{label_tool}`, explain the exact observed blocker in a comment, including the failed command and raw error when available, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not speculate about capabilities you did not directly verify. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`. `decodex` will complete the success writeback only after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}\n- Never write to any other issue.", - issue = issue_run.issue.identifier, - transition_tool = ISSUE_TRANSITION_TOOL_NAME, - comment_tool = ISSUE_COMMENT_TOOL_NAME, - label_tool = ISSUE_LABEL_ADD_TOOL_NAME, - progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, - review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, - terminal_finalize_tool = ISSUE_TERMINAL_FINALIZE_TOOL_NAME, - in_progress = workflow.frontmatter().tracker().in_progress_state(), - run_id = issue_run.run_id, - attempt = issue_run.attempt_number, - branch = issue_run.worktree.branch_name, - success = workflow.frontmatter().tracker().success_state(), - needs_attention = workflow.frontmatter().tracker().needs_attention_label(), + _ => format!( + "Tracker tool contract\n- You own issue-scoped tracker writes for `{issue}`.\n- At the start of execution, call `{transition_tool}` to move the issue to `{in_progress}`. Decodex already records the run-start Linear ledger, so do not add a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- When the implementation is ready, commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If you determine the issue needs human attention, add label `{needs_attention}` with `{label_tool}`, explain the exact observed blocker in a comment, including the failed command and raw error when available, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not speculate about capabilities you did not directly verify. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`. `decodex` will complete the success writeback only after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}\n- Never write to any other issue.", + issue = issue_run.issue.identifier, + transition_tool = ISSUE_TRANSITION_TOOL_NAME, + label_tool = ISSUE_LABEL_ADD_TOOL_NAME, + progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, + review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, + terminal_finalize_tool = ISSUE_TERMINAL_FINALIZE_TOOL_NAME, + in_progress = workflow.frontmatter().tracker().in_progress_state(), + branch = issue_run.worktree.branch_name, + success = workflow.frontmatter().tracker().success_state(), + needs_attention = workflow.frontmatter().tracker().needs_attention_label(), continuation_guidance = continuation_guidance, pr_title = review_pull_request_title(&issue_run.issue), internal_review_guidance = build_handoff_internal_review_guidance( @@ -191,23 +188,20 @@ where label_tool = ISSUE_LABEL_ADD_TOOL_NAME, continuation_guidance = continuation_guidance, ), - _ => format!( - "Resolve Linear issue {identifier}: {title}\n\nDescription:\n{description}\n\nExecution checklist:\n- Move the issue to `{in_progress}` with `{transition_tool}` and leave a short `{comment_tool}` comment that includes run `{run_id}` attempt `{attempt}`.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Keep discovery bounded to the minimal implementation files needed for this issue; defer broader docs or upstream reading unless a concrete ambiguity blocks the change.\n- Implement the fix in the current worktree.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- Run the repository validation needed to justify a reviewable PR.\n- Commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If the issue needs manual attention, add label `{needs_attention}` with `{label_tool}`, explain why in a comment, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`; `decodex` will finish that writeback after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}", - identifier = issue.identifier, - title = issue.title, - description = description, - transition_tool = ISSUE_TRANSITION_TOOL_NAME, - comment_tool = ISSUE_COMMENT_TOOL_NAME, - label_tool = ISSUE_LABEL_ADD_TOOL_NAME, - progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, - review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, - terminal_finalize_tool = ISSUE_TERMINAL_FINALIZE_TOOL_NAME, - in_progress = workflow.frontmatter().tracker().in_progress_state(), - run_id = issue_run.run_id, - attempt = issue_run.attempt_number, - branch = issue_run.worktree.branch_name, - success = workflow.frontmatter().tracker().success_state(), - needs_attention = workflow.frontmatter().tracker().needs_attention_label(), + _ => format!( + "Resolve Linear issue {identifier}: {title}\n\nDescription:\n{description}\n\nExecution checklist:\n- Move the issue to `{in_progress}` with `{transition_tool}`. Decodex already records the run-start Linear ledger, so do not leave a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Keep discovery bounded to the minimal implementation files needed for this issue; defer broader docs or upstream reading unless a concrete ambiguity blocks the change.\n- Implement the fix in the current worktree.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- Run the repository validation needed to justify a reviewable PR.\n- Commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If the issue needs manual attention, add label `{needs_attention}` with `{label_tool}`, explain why in a comment, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`; `decodex` will finish that writeback after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}", + identifier = issue.identifier, + title = issue.title, + description = description, + transition_tool = ISSUE_TRANSITION_TOOL_NAME, + label_tool = ISSUE_LABEL_ADD_TOOL_NAME, + progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, + review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, + terminal_finalize_tool = ISSUE_TERMINAL_FINALIZE_TOOL_NAME, + in_progress = workflow.frontmatter().tracker().in_progress_state(), + branch = issue_run.worktree.branch_name, + success = workflow.frontmatter().tracker().success_state(), + needs_attention = workflow.frontmatter().tracker().needs_attention_label(), continuation_guidance = continuation_guidance, pr_title = review_pull_request_title(issue), internal_review_guidance = build_handoff_internal_review_guidance( diff --git a/apps/decodex/src/orchestrator/run_cycle.rs b/apps/decodex/src/orchestrator/run_cycle.rs index 1d37dd0..4cf184c 100644 --- a/apps/decodex/src/orchestrator/run_cycle.rs +++ b/apps/decodex/src/orchestrator/run_cycle.rs @@ -2139,12 +2139,13 @@ where }; if !context.dry_run { - write_prepare_lifecycle_events( - context.tracker, - context.project, - context.state_store, - &issue_run, - )?; + write_prepare_lifecycle_events( + context.tracker, + context.project, + context.workflow, + context.state_store, + &issue_run, + )?; } Ok(Some(issue_run)) diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index 68c55fe..95315ac 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -21,18 +21,18 @@ use time::OffsetDateTime; use crate::tracker::records; #[rustfmt::skip] -use crate::agent::{ - ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, - AppServerHomePreflightFailure, AppServerTransportFailure, AppServerTurnFailure, - DynamicToolHandler, ReviewPolicyStopReason, ReviewPolicyStopRequested, TrackerToolBridge, - TurnContinuationGuard, -}; + use crate::agent::{ + ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, + AppServerHomePreflightFailure, AppServerTransportFailure, AppServerTurnFailure, + DynamicToolHandler, ReviewPolicyStopReason, ReviewPolicyStopRequested, TrackerToolBridge, + TurnContinuationGuard, + }; #[rustfmt::skip] use crate::config::{InternalReviewMode, ServiceConfig}; #[rustfmt::skip] use crate::github; #[rustfmt::skip] -use crate::orchestrator::{self, ActiveChildRunContext, ActiveRunDisposition, ActiveRunReconciliation, ActiveWorkflowOverride, ChildExitRetryContext, ChildRunRef, ControlPlaneProjectTick, CONTINUATION_PENDING_RUN_STATUS, DaemonRunChild, DaemonTickRuntimeContext, DashboardEventHub, GhPullRequestReviewStateInspector, ISSUE_COMMENT_TOOL_NAME, ISSUE_DELIVERY_CLOSEOUT_COMPLETE_TOOL_NAME, ISSUE_LABEL_ADD_TOOL_NAME, ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_HANDOFF_TOOL_NAME, ISSUE_REVIEW_REPAIR_COMPLETE_TOOL_NAME, ISSUE_TERMINAL_FINALIZE_TOOL_NAME, ISSUE_TRANSITION_TOOL_NAME, IssueDispatchMode, IssueRunPlan, IssueTurnContinuationGuard, ManualAttentionRequested, OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH, OPERATOR_DASHBOARD_ENDPOINT_PATH, OperatorStatusSnapshot, PostReviewLaneClassification, PostReviewLaneDecision, PostReviewLaneSnapshot, PreferredRunIdentity, PrepareIssueRunContext, PublishedOperatorSnapshot, PullRequestCommitConnection, PullRequestCommitNode, PullRequestCommitPayload, PullRequestIssueCommentConnection, PullRequestIssueCommentState, PullRequestIssueCommentsNode, PullRequestPageInfo, PullRequestReactionGroup, PullRequestReactionUsersConnection, PullRequestActor, PullRequestRepository, PullRequestRepositoryOwner, PullRequestReviewConnection, PullRequestIssueCommentNode, PullRequestReviewNode, PullRequestReviewRequestConnection, PullRequestReviewState, PullRequestReviewStateInspector, PullRequestReviewStateNode, PullRequestReviewStateRepository, PullRequestReviewSummaryState, PullRequestReviewThreadConnection, PullRequestReviewThreadNode, PullRequestStatusCheckRollup, RecoveredRuntimeState, RetainedPartialProgress, RetainedReviewRunIdentity, RetryComment, RetryDispatchDecision, RetryEntry, RetryKind, RetryQueue, RunCompletionDisposition, RunSummary, RepoGateFailure, TERMINAL_GUARD_MARKER_FILE, TERMINAL_GUARDED_RUN_STATUS, TRACKER_RATE_LIMIT_WARNING, TargetIssueRunContext, EXTERNAL_REVIEW_ACTOR_LOGIN, EXTERNAL_REVIEW_PASS_PHRASE, EXTERNAL_REVIEW_REQUEST_BODY}; +use crate::orchestrator::{self, ActiveChildRunContext, ActiveRunDisposition, ActiveRunReconciliation, ActiveWorkflowOverride, AgentEvidenceSource, ChildExitRetryContext, ChildRunRef, ControlPlaneProjectTick, CONTINUATION_PENDING_RUN_STATUS, DaemonRunChild, DaemonTickRuntimeContext, DashboardEventHub, GhPullRequestReviewStateInspector, ISSUE_DELIVERY_CLOSEOUT_COMPLETE_TOOL_NAME, ISSUE_LABEL_ADD_TOOL_NAME, ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, ISSUE_REVIEW_HANDOFF_TOOL_NAME, ISSUE_REVIEW_REPAIR_COMPLETE_TOOL_NAME, ISSUE_TERMINAL_FINALIZE_TOOL_NAME, ISSUE_TRANSITION_TOOL_NAME, IssueDispatchMode, IssueRunPlan, IssueTurnContinuationGuard, ManualAttentionRequested, OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH, OPERATOR_DASHBOARD_ENDPOINT_PATH, OperatorStatusSnapshot, PostReviewLaneClassification, PostReviewLaneDecision, PostReviewLaneSnapshot, PreferredRunIdentity, PrepareIssueRunContext, PublishedOperatorSnapshot, PullRequestCommitConnection, PullRequestCommitNode, PullRequestCommitPayload, PullRequestIssueCommentConnection, PullRequestIssueCommentState, PullRequestIssueCommentsNode, PullRequestPageInfo, PullRequestReactionGroup, PullRequestReactionUsersConnection, PullRequestActor, PullRequestRepository, PullRequestRepositoryOwner, PullRequestReviewConnection, PullRequestIssueCommentNode, PullRequestReviewNode, PullRequestReviewRequestConnection, PullRequestReviewState, PullRequestReviewStateInspector, PullRequestReviewStateNode, PullRequestReviewStateRepository, PullRequestReviewSummaryState, PullRequestReviewThreadConnection, PullRequestReviewThreadNode, PullRequestStatusCheckRollup, RecoveredRuntimeState, RetainedPartialProgress, RetainedReviewRunIdentity, RetryComment, RetryDispatchDecision, RetryEntry, RetryKind, RetryQueue, RunCompletionDisposition, RunSummary, RepoGateFailure, TERMINAL_GUARD_MARKER_FILE, TERMINAL_GUARDED_RUN_STATUS, TRACKER_RATE_LIMIT_WARNING, TargetIssueRunContext, EXTERNAL_REVIEW_ACTOR_LOGIN, EXTERNAL_REVIEW_PASS_PHRASE, EXTERNAL_REVIEW_REQUEST_BODY}; #[rustfmt::skip] use crate::prelude::Result; #[rustfmt::skip] @@ -79,6 +79,7 @@ include!("tests/operator/status/history.rs"); include!("tests/operator/status/text.rs"); include!("tests/operator/status/publishing.rs"); include!("tests/operator/status/queue.rs"); +include!("tests/operator/status/agent_evidence.rs"); include!("tests/operator/status/http.rs"); include!("tests/operator/status/dashboard.rs"); include!("tests/review_landing/status_support.rs"); diff --git a/apps/decodex/src/orchestrator/tests/intake/prepare_issue_run.rs b/apps/decodex/src/orchestrator/tests/intake/prepare_issue_run.rs index 1ba563b..4165602 100644 --- a/apps/decodex/src/orchestrator/tests/intake/prepare_issue_run.rs +++ b/apps/decodex/src/orchestrator/tests/intake/prepare_issue_run.rs @@ -58,11 +58,7 @@ fn prepare_issue_run_records_starting_attempt_before_execute() { assert_eq!( event_types, - vec![ - String::from("intake"), - String::from("lease_acquired"), - String::from("worktree_prepared"), - ] + vec![String::from("run_started")] ); } diff --git a/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs b/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs index e337358..74e5946 100644 --- a/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs +++ b/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs @@ -526,6 +526,8 @@ fn developer_instructions_trim_workflow_body_and_preserve_required_guidance() { assert!(instructions.contains("Do not browse upstream references")); assert!(instructions.contains("Tracker tool contract")); assert!(instructions.contains("You own issue-scoped tracker writes for `PUB-101`.")); + assert!(instructions.contains("Decodex already records the run-start Linear ledger")); + assert!(!instructions.contains("started work on run")); assert!( instructions.contains("Do not speculate about capabilities you did not directly verify.") ); diff --git a/apps/decodex/src/orchestrator/tests/intake/workflow_reload.rs b/apps/decodex/src/orchestrator/tests/intake/workflow_reload.rs index a6ee868..12d76d9 100644 --- a/apps/decodex/src/orchestrator/tests/intake/workflow_reload.rs +++ b/apps/decodex/src/orchestrator/tests/intake/workflow_reload.rs @@ -172,21 +172,18 @@ fn expected_developer_instructions( "Commit contract\n- When you create a local commit for this lane, use a single-line `decodex/commit/1` JSON commit message.\n- Required fields: `schema`, `summary`, and `authority`.\n- `authority` must be the authoritative Linear issue identifier for this lane.\n- Optional fields: `related` and `breaking`.\n- Do not encode landing mode, CI status, closeout state, or other process-state fields in the commit message.", )); - sections.push(format!( - "Tracker tool contract\n- You own issue-scoped tracker writes for `{issue}`.\n- At the start of execution, call `{transition_tool}` to move the issue to `{in_progress}` and add a brief `{comment_tool}` comment that you started work on run `{run_id}` attempt `{attempt}`.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Follow the repo-native bounded review method from `WORKFLOW.md`: review the actual current diff and branch state, run both the requirements pass and the adversarial reviewer pass, fix only the smallest coherent owned batch, rerun verification, and re-read `HEAD` before deciding the next normalized review status.\n- Every time the repo-native bounded review method produces a result for the current head, call `{review_checkpoint_tool}` with that normalized status, the exact current `HEAD` SHA, and any concise evidence items.\n- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- When the implementation is ready, commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n- Call `{review_handoff_tool}` only after the latest `{review_checkpoint_tool}` for this handoff phase and current `HEAD` is `clean`. Then call `{terminal_finalize_tool}` with path `review_handoff`.\n- If you determine the issue needs human attention, add label `{needs_attention}` with `{label_tool}`, explain the exact observed blocker in a comment, including the failed command and raw error when available, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not speculate about capabilities you did not directly verify. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`. `decodex` will complete the success writeback only after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}\n- Never write to any other issue.", - issue = issue_run.issue.identifier, - transition_tool = ISSUE_TRANSITION_TOOL_NAME, - comment_tool = ISSUE_COMMENT_TOOL_NAME, - label_tool = ISSUE_LABEL_ADD_TOOL_NAME, - progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, - review_checkpoint_tool = ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, - review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, - terminal_finalize_tool = ISSUE_TERMINAL_FINALIZE_TOOL_NAME, - in_progress = workflow.frontmatter().tracker().in_progress_state(), - run_id = issue_run.run_id, - attempt = issue_run.attempt_number, - branch = issue_run.worktree.branch_name, - pr_title = orchestrator::review_pull_request_title(&issue_run.issue), + sections.push(format!( + "Tracker tool contract\n- You own issue-scoped tracker writes for `{issue}`.\n- At the start of execution, call `{transition_tool}` to move the issue to `{in_progress}`. Decodex already records the run-start Linear ledger, so do not add a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Follow the repo-native bounded review method from `WORKFLOW.md`: review the actual current diff and branch state, run both the requirements pass and the adversarial reviewer pass, fix only the smallest coherent owned batch, rerun verification, and re-read `HEAD` before deciding the next normalized review status.\n- Every time the repo-native bounded review method produces a result for the current head, call `{review_checkpoint_tool}` with that normalized status, the exact current `HEAD` SHA, and any concise evidence items.\n- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- When the implementation is ready, commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n- Call `{review_handoff_tool}` only after the latest `{review_checkpoint_tool}` for this handoff phase and current `HEAD` is `clean`. Then call `{terminal_finalize_tool}` with path `review_handoff`.\n- If you determine the issue needs human attention, add label `{needs_attention}` with `{label_tool}`, explain the exact observed blocker in a comment, including the failed command and raw error when available, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not speculate about capabilities you did not directly verify. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`. `decodex` will complete the success writeback only after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}\n- Never write to any other issue.", + issue = issue_run.issue.identifier, + transition_tool = ISSUE_TRANSITION_TOOL_NAME, + label_tool = ISSUE_LABEL_ADD_TOOL_NAME, + progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, + review_checkpoint_tool = ISSUE_REVIEW_CHECKPOINT_TOOL_NAME, + review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, + terminal_finalize_tool = ISSUE_TERMINAL_FINALIZE_TOOL_NAME, + in_progress = workflow.frontmatter().tracker().in_progress_state(), + branch = issue_run.worktree.branch_name, + pr_title = orchestrator::review_pull_request_title(&issue_run.issue), success = workflow.frontmatter().tracker().success_state(), needs_attention = workflow.frontmatter().tracker().needs_attention_label(), continuation_guidance = continuation_guidance, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/agent_evidence.rs b/apps/decodex/src/orchestrator/tests/operator/status/agent_evidence.rs new file mode 100644 index 0000000..b349ac7 --- /dev/null +++ b/apps/decodex/src/orchestrator/tests/operator/status/agent_evidence.rs @@ -0,0 +1,109 @@ +#[test] +fn agent_evidence_snapshot_writes_index_blockers_capsules_and_event_stream() { + let temp_dir = TempDir::new().expect("temp dir should create"); + let _home_guard = TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("temp path should be utf-8")); + let mut active_run = operator_status_text_active_run(); + + active_run.suspected_stall = true; + active_run.phase = String::from("stalled"); + + let mut blocked_candidate = operator_status_text_queued_candidates() + .into_iter() + .find(|candidate| candidate.issue_identifier == "PUB-102") + .expect("fixture should include queued issue"); + + blocked_candidate.classification = String::from("blocked"); + blocked_candidate.reason = String::from("missing_dispatch_briefing"); + + let mut missing_handoff_lane = operator_status_text_post_review_lanes() + .into_iter() + .next() + .expect("fixture should include retained review lane"); + + missing_handoff_lane.classification = String::from("blocked"); + missing_handoff_lane.reason = String::from("missing_review_handoff_record"); + + let snapshot = OperatorStatusSnapshot { + project_id: String::from(TEST_SERVICE_ID), + run_limit: 10, + warnings: Vec::new(), + connector_backoffs: Vec::new(), + projects: Vec::new(), + accounts: Vec::new(), + active_runs: vec![active_run.clone()], + recent_runs: vec![active_run], + history_lanes: Vec::new(), + queued_candidates: vec![blocked_candidate], + worktrees: operator_status_text_worktrees(), + post_review_lanes: vec![missing_handoff_lane], + }; + let results = orchestrator::write_agent_evidence_snapshot( + &snapshot, + AgentEvidenceSource::DiagnoseCommand, + ) + .expect("agent evidence should write"); + let result = results.first().expect("project evidence should exist"); + let index_path = temp_dir + .path() + .join(".codex/decodex/agent-evidence/pubfi/handoff-index.json"); + let index_json = read_json_file(&index_path); + + assert_eq!(result.project_id, TEST_SERVICE_ID); + assert_eq!(result.handoff_index_path, index_path.display().to_string()); + assert_eq!(index_json["schema"], "decodex.agent_handoff_index/1"); + assert_eq!(index_json["project_id"], TEST_SERVICE_ID); + assert_eq!(index_json["source"], "diagnose_command"); + assert_eq!(index_json["summary"]["blocker_count"], 3); + assert_eq!(index_json["summary"]["run_capsule_count"], 1); + assert_eq!( + index_json["blockers"][0]["blocker_snapshot_path"], + temp_dir + .path() + .join(".codex/decodex/agent-evidence/pubfi/blockers/pub-101.json") + .display() + .to_string() + ); + assert!( + index_json["recovery_contracts"] + .as_array() + .expect("recovery contracts should be array") + .iter() + .any(|contract| contract["reason_code"] == "missing_review_handoff_record") + ); + + let capsule_path = index_json["run_capsules"][0]["path"] + .as_str() + .expect("run capsule path should be a string"); + let capsule_json = read_json_file(Path::new(capsule_path)); + + assert_eq!(capsule_json["schema"], "decodex.run_capsule/1"); + assert_eq!(capsule_json["run_id"], "run-1"); + assert_eq!(capsule_json["diagnosis"]["reason_code"], "suspected_stall"); + + let blocker_json = read_json_file( + &temp_dir + .path() + .join(".codex/decodex/agent-evidence/pubfi/blockers/pub-101.json"), + ); + + assert_eq!(blocker_json["schema"], "decodex.blocker_snapshot/1"); + assert_eq!(blocker_json["issue_identifier"], "PUB-101"); + assert_eq!(blocker_json["related_run_capsules"][0]["run_id"], "run-1"); + + let events_path = temp_dir + .path() + .join(".codex/decodex/agent-evidence/pubfi/events.jsonl"); + let events_body = fs::read_to_string(events_path).expect("events stream should exist"); + let event_json: Value = + serde_json::from_str(events_body.lines().next().expect("event line should exist")) + .expect("event should be JSON"); + + assert_eq!(event_json["schema"], "decodex.agent_evidence_event/1"); + assert_eq!(event_json["blocker_count"], 3); +} + +fn read_json_file(path: &Path) -> Value { + let body = fs::read_to_string(path).expect("JSON file should exist"); + + serde_json::from_str(&body).expect("JSON file should parse") +} diff --git a/apps/decodex/src/orchestrator/tests/operator/status_support.rs b/apps/decodex/src/orchestrator/tests/operator/status_support.rs index 02fe8f6..7277105 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status_support.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status_support.rs @@ -8,15 +8,19 @@ use serde_json::Value; fn successful_linear_execution_history_comments(issue: &TrackerIssue) -> Vec { vec![ - linear_execution_history_comment( - issue, - "intake", - "2026-04-29T10:00:00Z", - "intake", - |record| { - record.summary = Some(String::from("Queued issue for Decodex execution.")); - }, - ), + linear_execution_history_comment( + issue, + "run_started", + "2026-04-29T10:00:00Z", + "run-start", + |record| { + record.branch = Some(String::from("y/decodex-xy-355")); + record.worktree_path = Some(String::from(".worktrees/XY-355")); + record.commit_sha = Some(String::from("0000000000000000000000000000000000000000")); + record.transport = Some(String::from("stdio://")); + record.summary = Some(String::from("Started the Decodex lane.")); + }, + ), linear_execution_history_comment( issue, "review_handoff", diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 0d0b5a9..eb9c302 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -33,6 +33,13 @@ pub(crate) struct ServeRequest<'a> { pub(crate) listen_address: &'a str, } +/// Agent-readable runtime diagnosis request. +pub(crate) struct DiagnoseRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) json: bool, + pub(crate) limit: usize, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct RunSummary { project_id: String, diff --git a/apps/decodex/src/runtime.rs b/apps/decodex/src/runtime.rs index c796ae1..d2c3ea8 100644 --- a/apps/decodex/src/runtime.rs +++ b/apps/decodex/src/runtime.rs @@ -36,6 +36,11 @@ pub(crate) fn log_dir() -> Result { Ok(decodex_home_dir()?.join("logs")) } +/// Resolve the local agent-readable evidence directory. +pub(crate) fn agent_evidence_dir() -> Result { + Ok(decodex_home_dir()?.join("agent-evidence")) +} + /// Resolve the global single-machine runtime database path. pub(crate) fn runtime_db_path() -> Result { Ok(decodex_home_dir()?.join("runtime.sqlite3")) @@ -139,41 +144,13 @@ fn config_fingerprint(config_path: &Path, workflow_path: &Path) -> Result, - } - - impl EnvVarGuard { - fn set(key: &'static str, value: &Path) -> Self { - let previous = env::var_os(key); - - unsafe { - env::set_var(key, value); - } - - Self { key, previous } - } - } - - impl Drop for EnvVarGuard { - fn drop(&mut self) { - match self.previous.take() { - Some(previous) => unsafe { env::set_var(self.key, previous) }, - None => unsafe { env::remove_var(self.key) }, - } - } - } + use crate::{runtime, state::StateStore, test_support::TestEnvVarGuard}; #[test] fn runtime_paths_live_under_codex_decodex_home() { @@ -185,10 +162,21 @@ mod tests { ); } + #[test] + fn agent_evidence_path_lives_under_decodex_home() { + let temp_dir = TempDir::new().expect("temp dir should create"); + let _home_guard = set_test_home(temp_dir.path()); + + assert_eq!( + runtime::agent_evidence_dir().expect("agent evidence path should resolve"), + temp_dir.path().join(".codex/decodex/agent-evidence") + ); + } + #[test] fn project_config_registration_requires_explicit_repo_root() { let temp_dir = TempDir::new().expect("temp dir should create"); - let _home_guard = EnvVarGuard::set("HOME", temp_dir.path()); + let _home_guard = set_test_home(temp_dir.path()); let state_store = StateStore::open(temp_dir.path().join("runtime.sqlite3")) .expect("state store should open"); let config_dir = @@ -209,6 +197,10 @@ mod tests { ); } + fn set_test_home(path: &Path) -> TestEnvVarGuard { + TestEnvVarGuard::set("HOME", path.to_str().expect("test home should be UTF-8")) + } + #[test] fn registered_config_path_for_cwd_matches_repo_and_worktree_roots() { let temp_dir = TempDir::new().expect("temp dir should create"); diff --git a/apps/decodex/src/tracker/records.rs b/apps/decodex/src/tracker/records.rs index 22770ec..df2919d 100644 --- a/apps/decodex/src/tracker/records.rs +++ b/apps/decodex/src/tracker/records.rs @@ -302,6 +302,14 @@ fn validate_linear_execution_event_fields( record: &LinearExecutionEventRecord, ) -> Result<(), String> { match record.event_type.as_str() { + "run_started" => { + require_string(record.branch.as_deref(), "branch")?; + require_string(record.worktree_path.as_deref(), "worktree_path")?; + require_string(record.commit_sha.as_deref(), "commit_sha")?; + require_string(record.transport.as_deref(), "transport")?; + + require_string(record.summary.as_deref(), "summary") + }, "intake" => require_string(record.summary.as_deref(), "summary"), "lease_acquired" => { require_string(record.branch.as_deref(), "branch")?; diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index 0f9be1a..a97683b 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -45,6 +45,13 @@ non-terminal state, no open dependency blockers, and available local capacity. The runtime database is the local source of truth for active execution. Linear and GitHub remain external collaboration mirrors and validation surfaces. +Decodex also writes local agent-readable evidence under +`~/.codex/decodex/agent-evidence//`. This evidence is derived from the +operator snapshot and exists so a repair agent can quickly open one handoff index, +related blocker snapshots, and run capsules. It is not scheduling authority, not a +replacement for the runtime database, and not a Linear or GitHub collaboration record. +Use `decodex diagnose --json` when an agent needs the current handoff index directly. + ## State Ownership | Surface | Owns | Does Not Own | @@ -216,8 +223,8 @@ rate-limited, or unavailable. `reset_at`, `reset_unix_epoch`, `retry_after_seconds`, and operator `next_action`. Running lanes should still render from local runtime DB state while external sync is paused. -- Linear writes should stay coarse: start, progress checkpoint, PR-ready/handoff, - blocked/failed, landed, done, and cleanup summaries. +- Linear writes should stay coarse: one run-start ledger, material progress + checkpoints, PR-ready/handoff, blocked/failed, landed, done, and cleanup summaries. - Fine-grained retry budgets, raw attempts, heartbeat, child buckets, token pressure, and recovery details stay local. - Completed lanes without Decodex Linear execution ledger records are reported as diff --git a/docs/reference/workspace-layout.md b/docs/reference/workspace-layout.md index 02424b4..174e2b9 100644 --- a/docs/reference/workspace-layout.md +++ b/docs/reference/workspace-layout.md @@ -99,6 +99,9 @@ Runtime state that belongs to the local operator, not to this repository, lives - `runtime.sqlite3` is the single-machine control-plane database for all registered projects. - `logs/` stores Decodex process logs. +- `agent-evidence//` stores local agent-readable diagnosis artifacts, + including `handoff-index.json`, `events.jsonl`, `blockers/*.json`, and + `runs///capsule.json`. - `projects//project.toml` stores the central service config for one registered project. - `projects//WORKFLOW.md` stores that project's execution policy. diff --git a/docs/spec/agent-evidence.md b/docs/spec/agent-evidence.md new file mode 100644 index 0000000..bd06b40 --- /dev/null +++ b/docs/spec/agent-evidence.md @@ -0,0 +1,140 @@ +# Agent Evidence + +Purpose: Define the local, agent-readable evidence files Decodex writes for fast +debugging and recovery handoff. + +Status: normative + +Read this when: You need to know where an agent should start when diagnosing a +local Decodex automation run, blocker, retained lane, or connector outage. + +Not this document: The runtime state machine, human runbook steps, Linear execution +ledger schema, or dashboard layout. + +Defines: Local evidence paths, schemas, authority boundaries, and write triggers. + +## Authority + +Agent evidence is a derived diagnostic surface. It must not become the runtime source +of truth for leases, retries, retained PR state, queue selection, project +registration, or landing authority. + +The runtime SQLite database remains authoritative for Decodex-owned local state. The +operator snapshot remains the shared local projection for status, dashboard, and +diagnosis. Agent evidence is a stable file projection of that snapshot so a repair +agent can start from one compact index instead of reconstructing state from logs, +Markdown notes, worktree names, and ad hoc SQL. + +## Path Layout + +Agent evidence lives under the local Decodex home: + +```text +~/.codex/decodex/agent-evidence// + handoff-index.json + events.jsonl + blockers/.json + runs///capsule.json +``` + +These files are local-only operator state. They are not committed to target +repositories, mirrored to Linear, or used as external collaboration records. + +## Write Triggers + +Decodex writes evidence through two entrypoints: + +- `decodex diagnose` generates evidence for one resolved project config and prints a + one-line summary by default. +- `decodex diagnose --json` generates the same files and prints the + `decodex.agent_handoff_index/1` JSON body. +- `decodex serve` refreshes evidence after each successfully built per-project + operator snapshot. If evidence writing fails during `serve`, Decodex logs a warning + and keeps the control plane running; evidence files must not block scheduling. + +The `diagnose` command may fall back to local runtime state when tracker credentials +or live observer refresh is unavailable. In that case, the handoff index includes a +typed warning such as `diagnose_tracker_observer_unavailable` or +`diagnose_live_observer_unavailable`. + +## Handoff Index Schema + +`handoff-index.json` uses schema `decodex.agent_handoff_index/1`. + +Required fields: + +- `schema`: exactly `decodex.agent_handoff_index/1` +- `project_id`: service id for the evidence directory +- `generated_at`: UTC RFC 3339 timestamp +- `source`: `diagnose_command` or `serve_tick` +- `evidence_root`, `handoff_index_path`, `blockers_dir`, `runs_dir`, `events_path`: + absolute local paths +- `summary`: counts for projects, active runs, recent runs, history lanes, queued + candidates, post-review lanes, recovery worktrees, blockers, run capsules, + connector backoffs, and warnings +- `warnings`: typed operator snapshot or diagnose warning strings +- `connector_backoffs`: typed connector wait records from the operator snapshot +- `blockers`: compact blocker refs with reason codes, next action, and snapshot path +- `run_capsules`: compact run refs with capsule paths +- `recovery_worktrees`: retained local worktrees that need cleanup or recovery context +- `recovery_contracts`: commands or next actions an agent can use for supported + recovery classes + +Consumers must treat unknown additive fields as non-breaking. + +## Blocker Snapshot Schema + +`blockers/.json` uses schema `decodex.blocker_snapshot/1`. + +Each blocker carries: + +- `surface`: `running_lane`, `intake_queue`, `review_landing`, + `recovery_worktree`, `operator_snapshot`, or `connector_backoff` +- `classification`: the operator snapshot classification +- `reason_code`: a stable machine reason such as `suspected_stall`, + `missing_dispatch_briefing`, `missing_review_handoff_record`, or + `tracker_rate_limited` +- `next_action`: a short agent-facing recovery hint +- `related_run_capsule_path`: the run capsule path when the blocker belongs to a + known run + +For `missing_review_handoff_record`, the recovery contract must point agents to +`decodex recover review-handoff diagnose --json`. Rebind remains an explicit +validated recovery action and must not be inferred from branch names, current HEAD, +Linear comments, or the evidence file alone. + +## Run Capsule Schema + +`runs///capsule.json` uses schema `decodex.run_capsule/1`. + +The capsule captures the compact runtime state an agent needs before opening a +worktree: + +- issue id, issue identifier, title, run id, attempt number +- status, raw attempt status, phase, wait reason, current operation +- queue lease state and execution liveness +- thread, turn, process, protocol event, idle, and progress fields +- effective model/provider/cwd/approval/sandbox fields when known +- branch and worktree path +- optional Run Ledger outcome +- `diagnosis.attention_required`, `diagnosis.reason_code`, and + `diagnosis.next_action` + +Capsules are rewritten snapshots, not append-only event logs. The append-only stream +is `events.jsonl`. + +## Event Stream + +`events.jsonl` uses schema `decodex.agent_evidence_event/1`. + +Each line records one evidence write with source, project id, handoff index path, +blocker count, run capsule count, warning count, and connector backoff count. This +stream exists so a future agent can identify when evidence changed without diffing +all JSON files. + +## Privacy Boundary + +Agent evidence may include local filesystem paths, issue identifiers, PR URLs, +branch names, run ids, thread ids, model names, status classifications, and compact +next actions. It must not intentionally include raw model transcripts, full command +output, secret values, API tokens, or unredacted connector error bodies. diff --git a/docs/spec/index.md b/docs/spec/index.md index c62dd6d..01789bf 100644 --- a/docs/spec/index.md +++ b/docs/spec/index.md @@ -70,6 +70,8 @@ Then keep the body explicit: the static site. - [`linear-execution-ledger.md`](./linear-execution-ledger.md) defines the versioned Linear comment event-ledger schema for low-frequency Decodex lane transitions. +- [`agent-evidence.md`](./agent-evidence.md) defines the local agent-readable evidence + files written under `~/.codex/decodex/agent-evidence//`. - [`commit-messages.md`](./commit-messages.md) defines the machine-readable commit-message contract for local history. - [`installable-agent-policy.md`](./installable-agent-policy.md) defines the boundary diff --git a/docs/spec/linear-execution-ledger.md b/docs/spec/linear-execution-ledger.md index f22bfdc..c36135e 100644 --- a/docs/spec/linear-execution-ledger.md +++ b/docs/spec/linear-execution-ledger.md @@ -125,10 +125,7 @@ defined in this document are invalid for `record_version = 1`. The event type set is intentionally small and low-frequency: -- `intake` -- `lease_acquired` -- `worktree_prepared` -- `agent_started` +- `run_started` - `progress_checkpoint` - `pr_opened` - `pr_updated` @@ -141,7 +138,11 @@ The event type set is intentionally small and low-frequency: - `terminal_failure` - `cleanup_complete` -No other `event_type` value is valid for `record_version = 1`. +No other `event_type` value is valid for new `record_version = 1` writes. +Historical startup records with `event_type` values `intake`, `lease_acquired`, +`worktree_prepared`, and `agent_started` remain valid for old comments, but current +Decodex writers must emit one `run_started` record instead of those separate startup +records. ## Event-specific fields @@ -149,10 +150,11 @@ Every event requires the record envelope. Additional required fields are listed | Event type | Additional required fields | Common optional fields | | --- | --- | --- | -| `intake` | `summary` | `branch`, `worktree_path` | -| `lease_acquired` | `branch` | `worktree_path`, `summary` | -| `worktree_prepared` | `branch`, `worktree_path`, `commit_sha` | `summary` | -| `agent_started` | `branch`, `worktree_path` | `transport`, `summary` | +| `run_started` | `branch`, `worktree_path`, `commit_sha`, `transport`, `summary` | | +| `intake` | `summary` | `branch`, `worktree_path`; legacy read-only startup record | +| `lease_acquired` | `branch` | `worktree_path`, `summary`; legacy read-only startup record | +| `worktree_prepared` | `branch`, `worktree_path`, `commit_sha` | `summary`; legacy read-only startup record | +| `agent_started` | `branch`, `worktree_path` | `transport`, `summary`; legacy read-only startup record | | `progress_checkpoint` | `phase`, `focus`, `next_action`, `blockers`, `evidence` | `branch`, `worktree_path`, `commit_sha`, `pr_url`, `verification`, `summary` | | `pr_opened` | `branch`, `pr_url`, `pr_head_sha`, `pr_base_ref`, `commit_sha` | `worktree_path`, `summary` | | `pr_updated` | `branch`, `pr_url`, `pr_head_sha`, `pr_base_ref`, `commit_sha` | `worktree_path`, `summary` | @@ -233,7 +235,7 @@ commit SHA, PR head SHA, terminal path, or checkpoint sequence key. Use Linear comments for team-visible, low-frequency records: -- lane intake, lease acquisition, worktree preparation, and agent start +- lane run start, including branch, worktree, current commit, and transport - durable progress checkpoints - PR opened or updated events - review handoff and retained repair handoff diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 7154e45..aadf07e 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -180,11 +180,13 @@ At the start of a normal run, the coding agent should: 1. Acquire the local lease. 2. Transition the issue to `In Progress`. -3. Post the applicable structured run-start comment. +3. Post the applicable structured `run_started` comment. -Run-start, lease, and worktree-preparation comments are Linear execution ledger records. -Their record envelope, event type, required fields, idempotency key, and -repository-relative `worktree_path` rules are defined by +The run-start comment is one Linear execution ledger record for new runs. It carries +the branch, repository-relative worktree path, current commit, transport, run id, and +attempt number instead of emitting separate intake, lease, worktree-preparation, and +agent-start comments. Its record envelope, event type, required fields, idempotency +key, and repository-relative `worktree_path` rules are defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). ### Completion disposition @@ -294,7 +296,7 @@ and idempotency fields are defined by ## Local operational state -The local runtime store is the global Decodex SQLite database for one local installation. It lives at `~/.codex/decodex/runtime.sqlite3`, not inside any registered project checkout or worktree. Every row that belongs to a repo is scoped by `project_id`. Decodex logs live beside that database under `~/.codex/decodex/logs/`; vendor-qualified app-data directories and per-project runtime databases are not part of the runtime contract. +The local runtime store is the global Decodex SQLite database for one local installation. It lives at `~/.codex/decodex/runtime.sqlite3`, not inside any registered project checkout or worktree. Every row that belongs to a repo is scoped by `project_id`. Decodex logs live beside that database under `~/.codex/decodex/logs/`, and agent-readable derived evidence lives under `~/.codex/decodex/agent-evidence//`; vendor-qualified app-data directories and per-project runtime databases are not part of the runtime contract. Project contracts live outside registered repositories under `~/.codex/decodex/projects//`. Each project directory must contain `project.toml` and `WORKFLOW.md`; arbitrary project file names such as `.toml` are not part of the contract. `project.toml` must set `[paths].repo_root` so the project contract is explicit. Project registration stores the centralized `config_path`, target `repo_root`, `worktree_root`, and workflow path in the global runtime database. Commands that start inside a registered checkout or lane worktree resolve the project through that registry; they do not discover or trust worktree-local config files. `decodex serve` loads enabled registered projects from the global runtime database. It must not scan `.codex` history, repo-local config files, or currently open worktrees to infer additional projects. @@ -335,6 +337,7 @@ The minimum supported surface is: - structured runtime logs with stable identifiers such as `project_id`, `issue_id`, `issue`, `run_id`, `attempt`, `branch`, and repository-relative `worktree_path` - a local status command that renders the current service snapshot in both human-readable and JSON forms +- an agent evidence command, `decodex diagnose`, that writes a compact derived handoff index, blocker snapshots, run capsules, and an append-only evidence event stream under `~/.codex/decodex/agent-evidence//` The status surface should describe runtime DB-backed execution state, plus low-frequency connector refreshes and retained `.worktrees` lanes, for example: diff --git a/docs/spec/tracker-tools.md b/docs/spec/tracker-tools.md index 2e52756..6d67d5f 100644 --- a/docs/spec/tracker-tools.md +++ b/docs/spec/tracker-tools.md @@ -46,7 +46,8 @@ The follow-up MVP should support these issue-scoped operations: - `issue_transition` - move the current issue to an allowed target state - `issue_comment` - - add a comment to the current issue + - add an exceptional human-readable comment to the current issue for + manual-attention blockers or explicit collaboration notes - `issue_progress_checkpoint` - record the current durable execution-state snapshot for the current issue without changing lifecycle authority - `issue_review_checkpoint` @@ -114,6 +115,11 @@ In either invalid case, `decodex` must fail the attempt rather than infer which - when a normal lane is about to validate and write back `issue_review_handoff` - when a retained post-review lane is about to re-enter `review_repair` - Comment bodies should remain repository-controlled or agent-authored, but all tool calls must be journaled by `decodex` for recovery and audit. +- Routine start and progress visibility should use Linear execution ledger records + instead of ad hoc `issue_comment` text. A normal run start is represented by one + `run_started` ledger record, and ordinary progress uses `issue_progress_checkpoint` + only when execution phase, focus, next action, blockers, evidence, or verification + changes materially. - Structured Linear execution event comments must conform to [`linear-execution-ledger.md`](./linear-execution-ledger.md). - Structured comment fields such as `worktree_path` must use repository-relative paths;