diff --git a/apps/decodex/src/orchestrator/agent_evidence.rs b/apps/decodex/src/orchestrator/agent_evidence.rs index 423e0efa..1e825f46 100644 --- a/apps/decodex/src/orchestrator/agent_evidence.rs +++ b/apps/decodex/src/orchestrator/agent_evidence.rs @@ -149,6 +149,7 @@ struct AgentRunCapsule { interactive_requested: bool, process_id: Option, process_alive: Option, + process_liveness_reason: Option, event_count: i64, last_event_type: Option, last_event_at: Option, @@ -588,10 +589,11 @@ fn agent_run_capsule( turn_id: run.turn_id.clone(), thread_status: run.thread_status.clone(), thread_active_flags: run.thread_active_flags.clone(), - interactive_requested: run.interactive_requested, - process_id: run.process_id, - process_alive: run.process_alive, - event_count: run.event_count, + interactive_requested: run.interactive_requested, + process_id: run.process_id, + process_alive: run.process_alive, + process_liveness_reason: run.process_liveness_reason.clone(), + event_count: run.event_count, last_event_type: run.last_event_type.clone(), last_event_at: run.last_event_at.clone(), last_run_activity_at: run.last_run_activity_at.clone(), diff --git a/apps/decodex/src/orchestrator/operator_dashboard.html b/apps/decodex/src/orchestrator/operator_dashboard.html index 1c9a9067..88537416 100644 --- a/apps/decodex/src/orchestrator/operator_dashboard.html +++ b/apps/decodex/src/orchestrator/operator_dashboard.html @@ -4787,7 +4787,12 @@

Run History

facts.push(["Patch", "retained"]); } if (attention.process_alive != null) { - facts.push(["Process", attention.process_alive ? "alive" : "stopped"]); + facts.push([ + "Process", + attention.process_alive + ? "alive" + : processLivenessReasonLabel(attention.process_liveness_reason || "process_stopped"), + ]); } if (attention.worktree_path) { facts.push(["Worktree", attention.worktree_path]); @@ -5355,7 +5360,33 @@

Run History

if (run.process_alive == null) { return `${run.process_id} (unknown)`; } - return `${run.process_id} (${run.process_alive ? "alive" : "stopped"})`; + const reason = run.process_alive + ? "process_alive" + : run.process_liveness_reason || "process_stopped"; + return `${run.process_id} (${processLivenessReasonLabel(reason)})`; + } + + function processLivenessReasonLabel(reason) { + switch (reason) { + case "process_alive": + return "alive"; + case "process_stopped": + return "stopped"; + case "host_boot_id_missing": + return "boot identity missing"; + case "host_boot_id_unavailable": + return "boot identity unavailable"; + case "host_boot_id_mismatch": + return "previous boot"; + case "process_start_identity_missing": + return "start identity missing"; + case "process_start_identity_unavailable": + return "start identity unavailable"; + case "process_start_identity_mismatch": + return "process identity changed"; + default: + return reason ? reason.replaceAll("_", " ") : "unknown"; + } } function runExecutionLivenessSummary(run) { diff --git a/apps/decodex/src/orchestrator/prompting.rs b/apps/decodex/src/orchestrator/prompting.rs index 84124480..3dc2dcfd 100644 --- a/apps/decodex/src/orchestrator/prompting.rs +++ b/apps/decodex/src/orchestrator/prompting.rs @@ -1,6 +1,14 @@ const PROMPT_ONLY_INTERNAL_REVIEW_INSTRUCTION: &str = "Review your work repeatedly and fix any logic bugs until no new issues are found."; +fn build_retry_recovery_context(dispatch_mode: IssueDispatchMode) -> Option { + (dispatch_mode == IssueDispatchMode::Retry).then(|| { + String::from( + "Recovery context\n- This is retry-style re-entry after a prior attempt stopped or could not prove live execution.\n- Treat the current worktree, tracker state, protocol events, and marker files as the durable source of truth. Do not assume in-memory model output or tool results survived.\n- Before editing, inspect the current branch, diff, and recent validation evidence, reconcile partial work already present, and continue from that state instead of restarting from scratch.", + ) + }) +} + fn review_pull_request_title(issue: &TrackerIssue) -> String { let title = issue.title.trim(); let prefix = format!("{}:", issue.identifier); @@ -56,6 +64,10 @@ where "Commit contract\n- When you create a local commit for this lane, use a single-line `decodex/commit/1` JSON commit message.\n- Required fields: `schema`, `summary`, and `authority`.\n- `authority` must be the authoritative Linear issue identifier for this lane.\n- Optional fields: `related` and `breaking`.\n- Do not encode landing mode, CI status, closeout state, or other process-state fields in the commit message.", )); + if let Some(recovery_context) = build_retry_recovery_context(issue_run.dispatch_mode) { + sections.push(recovery_context); + } + let repair_architecture_guidance = build_external_repair_architecture_guidance(project, state_store, issue_run); let completed_state = workflow @@ -150,6 +162,9 @@ where .tracker() .resolved_completed_state(); let internal_review_mode = project.codex().internal_review_mode(); + let recovery_context = build_retry_recovery_context(issue_run.dispatch_mode) + .map(|section| format!("{section}\n\n")) + .unwrap_or_default(); match issue_run.dispatch_mode { IssueDispatchMode::ReviewRepair => format!( @@ -188,12 +203,13 @@ where label_tool = ISSUE_LABEL_ADD_TOOL_NAME, continuation_guidance = continuation_guidance, ), - _ => format!( - "Resolve Linear issue {identifier}: {title}\n\nDescription:\n{description}\n\nExecution checklist:\n- Move the issue to `{in_progress}` with `{transition_tool}`. Decodex already records the run-start Linear ledger, so do not leave a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Keep discovery bounded to the minimal implementation files needed for this issue; defer broader docs or upstream reading unless a concrete ambiguity blocks the change.\n- Implement the fix in the current worktree.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- Run the repository validation needed to justify a reviewable PR.\n- Commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If the issue needs manual attention, add label `{needs_attention}` with `{label_tool}`, explain why in a comment, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`; `decodex` will finish that writeback after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}", - identifier = issue.identifier, - title = issue.title, - description = description, - transition_tool = ISSUE_TRANSITION_TOOL_NAME, + _ => format!( + "Resolve Linear issue {identifier}: {title}\n\nDescription:\n{description}\n\n{recovery_context}Execution checklist:\n- Move the issue to `{in_progress}` with `{transition_tool}`. Decodex already records the run-start Linear ledger, so do not leave a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Keep discovery bounded to the minimal implementation files needed for this issue; defer broader docs or upstream reading unless a concrete ambiguity blocks the change.\n- Implement the fix in the current worktree.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- Run the repository validation needed to justify a reviewable PR.\n- Commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If the issue needs manual attention, add label `{needs_attention}` with `{label_tool}`, explain why in a comment, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`; `decodex` will finish that writeback after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}", + identifier = issue.identifier, + title = issue.title, + description = description, + recovery_context = recovery_context, + transition_tool = ISSUE_TRANSITION_TOOL_NAME, label_tool = ISSUE_LABEL_ADD_TOOL_NAME, progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 7d0c94d6..a9db0614 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -61,6 +61,7 @@ impl PostReviewOrchestrationStatus { struct OperatorRunTiming { process_id: Option, process_alive: Option, + process_liveness_reason: Option, last_run_activity_unix_epoch: Option, last_protocol_activity_unix_epoch: Option, last_progress_unix_epoch: Option, @@ -68,6 +69,12 @@ struct OperatorRunTiming { protocol_idle_for_seconds: Option, } +#[derive(Clone, Copy)] +struct MarkerProcessLiveness { + alive: bool, + reason: &'static str, +} + struct OperatorRunAppServerState { thread_id: Option, turn_id: Option, @@ -1536,6 +1543,7 @@ where retry_budget_attempts, worktree_has_tracked_changes, ); + let process_liveness = marker.as_ref().and_then(marker_process_liveness_for_marker); Ok(Some(OperatorQueuedIssueAttentionStatus { summary, @@ -1572,7 +1580,8 @@ where .and_then(RunActivityMarker::last_event_type) .map(str::to_owned), event_count: marker.as_ref().map_or(0, RunActivityMarker::event_count), - process_alive: marker.as_ref().and_then(|marker| marker.process_id().map(process_is_alive)), + process_alive: process_liveness.map(|liveness| liveness.alive), + process_liveness_reason: process_liveness.map(|liveness| liveness.reason.to_owned()), worktree_path: worktree_path .exists() .then(|| relative_worktree_path_for_path(project, &worktree_path)), @@ -3274,13 +3283,63 @@ fn recoverable_worktree_identifiers(worktree_root: &Path) -> crate::prelude::Res } fn worktree_activity_marker_is_fresh(marker: &RunActivityMarker, now_unix_epoch: i64) -> bool { - marker.process_id().is_some_and(process_is_alive) + marker_process_is_alive(marker) && marker .last_activity_unix_epoch() .and_then(|last_activity| observed_idle_duration(last_activity, now_unix_epoch)) .is_some_and(|idle_for| idle_for < ACTIVE_RUN_IDLE_TIMEOUT) } +fn marker_process_is_alive(marker: &RunActivityMarker) -> bool { + marker_process_liveness(marker).alive +} + +fn marker_process_liveness_for_marker( + marker: &RunActivityMarker, +) -> Option { + marker.process_id().map(|_| marker_process_liveness(marker)) +} + +fn marker_process_liveness(marker: &RunActivityMarker) -> MarkerProcessLiveness { + let Some(process_id) = marker.process_id() else { + return MarkerProcessLiveness { alive: false, reason: "process_id_missing" }; + }; + + if !process_is_alive(process_id) { + return MarkerProcessLiveness { alive: false, reason: "process_stopped" }; + } + + let Some(marker_host_boot_id) = marker.host_boot_id() else { + return MarkerProcessLiveness { alive: false, reason: "host_boot_id_missing" }; + }; + let Some(current_host_boot_id) = state::current_host_boot_id() else { + return MarkerProcessLiveness { alive: false, reason: "host_boot_id_unavailable" }; + }; + + if marker_host_boot_id != current_host_boot_id.as_str() { + return MarkerProcessLiveness { alive: false, reason: "host_boot_id_mismatch" }; + } + + let Some(marker_process_start_identity) = marker.process_start_identity() else { + return MarkerProcessLiveness { alive: false, reason: "process_start_identity_missing" }; + }; + let Some(current_process_start_identity) = state::process_start_identity(process_id) else { + return MarkerProcessLiveness { + alive: false, + reason: "process_start_identity_unavailable", + }; + }; + + if marker_process_start_identity != current_process_start_identity.as_str() { + return MarkerProcessLiveness { + alive: false, + reason: "process_start_identity_mismatch", + }; + } + + MarkerProcessLiveness { alive: true, reason: "process_alive" } +} + fn process_is_alive(process_id: u32) -> bool { let Ok(process_id) = pid_t::try_from(process_id) else { return false; @@ -3409,11 +3468,12 @@ fn operator_run_status( suspected_stall, last_event_type: protocol_summary.last_event_type, last_event_at: protocol_summary.last_event_at, - event_count: protocol_summary.event_count, - process_id: timing.process_id, - process_alive: timing.process_alive, - retry_kind, - next_retry_at: format_optional_unix_timestamp(retry_ready_at_unix_epoch), + event_count: protocol_summary.event_count, + process_id: timing.process_id, + process_alive: timing.process_alive, + process_liveness_reason: timing.process_liveness_reason, + retry_kind, + next_retry_at: format_optional_unix_timestamp(retry_ready_at_unix_epoch), effective_model: app_server_state.effective_model, effective_model_provider: app_server_state.effective_model_provider, effective_cwd: app_server_state.effective_cwd, @@ -3460,9 +3520,11 @@ fn operator_run_timing( marker.and_then(RunActivityMarker::last_progress_unix_epoch), last_protocol_activity_unix_epoch, ); + let process_liveness = marker.and_then(marker_process_liveness_for_marker); OperatorRunTiming { - process_alive: process_id.map(process_is_alive), + process_alive: process_liveness.map(|liveness| liveness.alive), + process_liveness_reason: process_liveness.map(|liveness| liveness.reason.to_owned()), process_id, last_run_activity_unix_epoch, last_protocol_activity_unix_epoch, @@ -4696,7 +4758,7 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) { let accounts = render_accounts_summary(&run.accounts); output.push_str(&format!( - "- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n", + "- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n process_liveness_reason: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n", run.run_id, run.project_id, run.issue_id, @@ -4744,6 +4806,7 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) { || String::from("none"), |value| if value { String::from("yes") } else { String::from("no") }, ), + run.process_liveness_reason.as_deref().unwrap_or("none"), run.retry_kind.as_deref().unwrap_or("none"), run.next_retry_at.as_deref().unwrap_or("none"), run.effective_model.as_deref().unwrap_or("none"), diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index 61c7d551..794ee589 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -53,41 +53,71 @@ use crate::orchestrator::{ReviewHandoffMarker, ReviewOrchestrationMarker}; // Workflow reload, intake eligibility, prompting, and candidate selection. include!("tests/intake/workflow_reload.rs"); + include!("tests/intake/eligibility.rs"); + include!("tests/intake/run_and_prompting.rs"); + include!("tests/intake/prepare_issue_run.rs"); + include!("tests/intake/candidate_selection.rs"); // Retry scheduling, runtime failure classes, and recovery cleanup. include!("tests/retry/scheduling.rs"); + include!("tests/retry/selection.rs"); + include!("tests/runtime/repo_gate.rs"); + include!("tests/runtime/failure.rs"); + include!("tests/recovery/reconciliation.rs"); + include!("tests/recovery/terminal_support.rs"); + include!("tests/recovery/closeout/dispatch.rs"); + include!("tests/recovery/closeout/identity.rs"); + include!("tests/recovery/closeout/cleanup.rs"); + include!("tests/recovery/terminal_failures.rs"); + include!("tests/recovery/runtime_reentry.rs"); // Operator status plus retained post-review review/landing behavior. include!("tests/operator/status_support.rs"); + include!("tests/operator/status/control_plane.rs"); + include!("tests/operator/status/running_lanes.rs"); + include!("tests/operator/status/history.rs"); + include!("tests/operator/status/text.rs"); + include!("tests/operator/status/publishing.rs"); + include!("tests/operator/status/queue.rs"); + include!("tests/operator/status/agent_evidence.rs"); + include!("tests/operator/status/http.rs"); + include!("tests/operator/status/dashboard.rs"); + include!("tests/review_landing/status_support.rs"); + include!("tests/review_landing/status_rows.rs"); + include!("tests/review_landing/orchestration.rs"); + include!("tests/review_landing/status_markers.rs"); + include!("tests/review_landing/classification_review.rs"); + include!("tests/review_landing/classification_checks.rs"); + include!("tests/review_landing/review_state.rs"); const TEST_EXTERNAL_REVIEW_REQUEST_COMMENT_ID: i64 = 991; @@ -307,6 +337,57 @@ impl PullRequestReviewStateInspector for FakePullRequestReviewStateInspector { } } +fn rewrite_run_activity_marker_host_boot_id(worktree_path: &Path, host_boot_id: &str) { + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); + let mut host_boot_id_written = false; + let mut rewritten = marker_body + .lines() + .map(|line| { + if line.starts_with("host_boot_id=") { + host_boot_id_written = true; + + format!("host_boot_id={host_boot_id}") + } else { + line.to_owned() + } + }) + .collect::>(); + + if !host_boot_id_written { + rewritten.push(format!("host_boot_id={host_boot_id}")); + } + + fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); +} + +fn rewrite_run_activity_marker_process_start_identity( + worktree_path: &Path, + process_start_identity: &str, +) { + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); + let mut process_start_identity_written = false; + let mut rewritten = marker_body + .lines() + .map(|line| { + if line.starts_with("process_start_identity=") { + process_start_identity_written = true; + + format!("process_start_identity={process_start_identity}") + } else { + line.to_owned() + } + }) + .collect::>(); + + if !process_start_identity_written { + rewritten.push(format!("process_start_identity={process_start_identity}")); + } + + fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); +} + fn install_fake_post_issue_comment_gh_response( temp_dir: &TempDir, comment_id: i64, diff --git a/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs b/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs index 74e59462..f5701897 100644 --- a/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs +++ b/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs @@ -623,6 +623,54 @@ fn normal_prompts_require_issue_prefixed_pull_request_title() { assert!(developer_instructions.contains("single-line `decodex/commit/1` JSON commit message")); } +#[test] +fn retry_prompts_include_recovery_context() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_issue("In Progress", &[]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let issue_run = orchestrator::IssueRunPlan { + issue: issue.clone(), + issue_state: String::from("In Progress"), + initial_issue_state: String::from("In Progress"), + worktree: WorktreeSpec { + branch_name: String::from("x/pubfi-pub-101"), + issue_identifier: String::from("PUB-101"), + path: config.worktree_root().join("PUB-101"), + reused_existing: true, + }, + retry_project_slug: String::from("pubfi"), + dispatch_mode: IssueDispatchMode::Retry, + attempt_number: 2, + run_id: String::from("pub-101-attempt-2-123"), + retry_budget_base: 1, + }; + let state_store = StateStore::open_in_memory().expect("state store should open"); + let developer_instructions = orchestrator::build_developer_instructions( + &tracker, + &config, + &workflow, + &issue_run, + &state_store, + None, + ) + .expect("developer instructions should build"); + let user_input = orchestrator::build_user_input( + &tracker, + &config, + &issue, + &workflow, + &issue_run, + &state_store, + None, + ); + + for prompt in [&developer_instructions, &user_input] { + assert!(prompt.contains("Recovery context")); + assert!(prompt.contains("Treat the current worktree")); + assert!(prompt.contains("Do not assume in-memory model output or tool results survived")); + } +} + #[test] fn normal_prompts_respect_non_loop_internal_review_modes() { for (mode, expected, forbidden_checkpoint) in [ diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index 3576a8cc..bdd3ec6f 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -771,6 +771,94 @@ fn operator_status_snapshot_counts_stopped_active_process_as_attention_not_runni assert_eq!(run.phase, "executing"); assert_eq!(run.process_alive, Some(false)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("process_stopped")); + assert_eq!(project.active_run_count, 0); + assert_eq!(project.attention_count, 1); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn operator_status_snapshot_counts_previous_boot_process_as_attention_not_running() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("Todo", &[]); + let worktree_path = config.worktree_root().join("PUB-101"); + + state_store + .record_run_attempt("run-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease("pubfi", &issue.id, "run-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, process::id()) + .expect("live process marker should write"); + + rewrite_run_activity_marker_host_boot_id(&worktree_path, "previous-boot"); + + let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) + .expect("snapshot should build"); + let run = snapshot.active_runs.first().expect("active run should remain visible"); + let project = snapshot.projects.first().expect("project summary should exist"); + + assert_eq!(run.phase, "executing"); + assert_eq!(run.process_id, Some(process::id())); + assert_eq!(run.process_alive, Some(false)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("host_boot_id_mismatch")); + assert_eq!(project.active_run_count, 0); + assert_eq!(project.attention_count, 1); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn operator_status_snapshot_counts_reused_pid_as_attention_not_running() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("Todo", &[]); + let worktree_path = config.worktree_root().join("PUB-101"); + + state_store + .record_run_attempt("run-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease("pubfi", &issue.id, "run-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, process::id()) + .expect("live process marker should write"); + + rewrite_run_activity_marker_process_start_identity(&worktree_path, "previous-process-start"); + + let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) + .expect("snapshot should build"); + let run = snapshot.active_runs.first().expect("active run should remain visible"); + let project = snapshot.projects.first().expect("project summary should exist"); + + assert_eq!(run.phase, "executing"); + assert_eq!(run.process_id, Some(process::id())); + assert_eq!(run.process_alive, Some(false)); + assert_eq!( + run.process_liveness_reason.as_deref(), + Some("process_start_identity_mismatch") + ); assert_eq!(project.active_run_count, 0); assert_eq!(project.attention_count, 1); } @@ -811,6 +899,7 @@ fn operator_status_snapshot_keeps_unleased_live_process_in_running_lanes() { assert_eq!(run.queue_lease_state, "not_held"); assert_eq!(run.execution_liveness, "process_alive"); assert_eq!(run.process_alive, Some(true)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("process_alive")); assert_eq!(project.active_run_count, 1); assert_eq!(project.retained_worktree_count, 0); } diff --git a/apps/decodex/src/orchestrator/tests/operator/status_support.rs b/apps/decodex/src/orchestrator/tests/operator/status_support.rs index 377c7ddc..1ac216f2 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status_support.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status_support.rs @@ -234,10 +234,11 @@ fn operator_status_text_active_run() -> orchestrator::OperatorRunStatus { suspected_stall: false, last_event_type: Some(String::from("turn/completed")), last_event_at: Some(String::from("2026-03-14 10:00:01")), - event_count: 4, - process_id: Some(1_234), - process_alive: Some(true), - retry_kind: None, + event_count: 4, + process_id: Some(1_234), + process_alive: Some(true), + process_liveness_reason: Some(String::from("process_alive")), + retry_kind: None, next_retry_at: None, effective_model: Some(String::from("gpt-5.4")), effective_model_provider: Some(String::from("openai")), diff --git a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs index 4fe2a3fa..e8b721e1 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs @@ -423,6 +423,121 @@ fn run_project_once_skips_recovered_worktree_with_fresh_activity_marker() { ); } +#[cfg(unix)] +#[test] +fn run_project_once_retries_recovered_worktree_after_marker_process_is_killed() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_active_issue("In Progress"); + let tracker = FakeTracker::with_refresh_snapshots( + vec![issue.clone()], + vec![vec![issue.clone()], vec![issue.clone()]], + ); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("recovered worktree should exist"); + let mut child = Command::new("/bin/sh") + .args(["-c", "exec sleep 60"]) + .spawn() + .expect("kill-smoke child process should start"); + let child_process_id = child.id(); + + assert!( + orchestrator::process_is_alive(child_process_id), + "kill-smoke child process should be live before marker write" + ); + + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, child_process_id) + .expect("activity marker should write"); + + child.kill().expect("kill-smoke child process should be killed"); + child.wait().expect("kill-smoke child process should be reaped"); + + assert!( + !orchestrator::process_is_alive(child_process_id), + "kill-smoke child process should no longer be live after kill" + ); + + let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) + .expect("kill-smoke recovery should succeed") + .expect("killed-process recovered lane should be selected for retry"); + + assert_eq!(summary.issue_id, issue.id); + assert_eq!(summary.dispatch_mode, orchestrator::IssueDispatchMode::Retry); + assert!( + state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none(), + "killed marker processes must not reconstruct live leases" + ); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn run_project_once_retries_recovered_worktree_from_previous_boot() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_active_issue("In Progress"); + let tracker = FakeTracker::with_refresh_snapshots( + vec![issue.clone()], + vec![vec![issue.clone()], vec![issue.clone()]], + ); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("recovered worktree should exist"); + + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, process::id()) + .expect("activity marker should write"); + + rewrite_run_activity_marker_host_boot_id(&worktree.path, "previous-boot"); + + let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) + .expect("previous-boot recovery should succeed") + .expect("previous-boot recovered lane should be selected for retry"); + + assert_eq!(summary.issue_id, issue.id); + assert_eq!(summary.dispatch_mode, orchestrator::IssueDispatchMode::Retry); + assert!( + state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none(), + "previous-boot markers must not reconstruct live leases even when the PID exists" + ); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn run_project_once_retries_recovered_worktree_from_reused_pid() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_active_issue("In Progress"); + let tracker = FakeTracker::with_refresh_snapshots( + vec![issue.clone()], + vec![vec![issue.clone()], vec![issue.clone()]], + ); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("recovered worktree should exist"); + + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, process::id()) + .expect("activity marker should write"); + + rewrite_run_activity_marker_process_start_identity(&worktree.path, "previous-process-start"); + + let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) + .expect("same-boot PID-reuse recovery should succeed") + .expect("same-boot PID-reuse recovered lane should be selected for retry"); + + assert_eq!(summary.issue_id, issue.id); + assert_eq!(summary.dispatch_mode, orchestrator::IssueDispatchMode::Retry); + assert!( + state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none(), + "PID reuse must not reconstruct live leases when the process start identity changed" + ); +} + #[cfg(unix)] #[test] fn process_is_alive_handles_current_process_and_invalid_sentinel() { diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index e6d906bc..7f0f5a75 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -783,6 +783,7 @@ struct OperatorRunStatus { event_count: i64, process_id: Option, process_alive: Option, + process_liveness_reason: Option, retry_kind: Option, next_retry_at: Option, effective_model: Option, @@ -832,6 +833,7 @@ struct OperatorQueuedIssueAttentionStatus { last_event_type: Option, event_count: i64, process_alive: Option, + process_liveness_reason: Option, worktree_path: Option, worktree_has_tracked_changes: bool, } diff --git a/apps/decodex/src/state.rs b/apps/decodex/src/state.rs index c034359e..6e2c5f47 100644 --- a/apps/decodex/src/state.rs +++ b/apps/decodex/src/state.rs @@ -8,7 +8,7 @@ use std::{ io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, process, - sync::{Mutex, MutexGuard}, + sync::{Mutex, MutexGuard, OnceLock}, time::Duration, }; diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 79e63f16..b60da18f 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -1,6 +1,17 @@ +#[cfg(target_os = "macos")] +use std::mem::MaybeUninit; + use libc::FD_CLOEXEC; use libc::F_GETFD; use libc::F_SETFD; +#[cfg(target_os = "macos")] +use libc::{ + c_void, + proc_bsdinfo, + PROC_PIDTBSDINFO, +}; +#[cfg(target_os = "macos")] +use process::Command; pub(crate) struct EffectiveRuntimeMarker<'a> { pub(crate) thread_id: Option<&'a str>, @@ -936,6 +947,8 @@ struct RunActivityMarkerRecord { run_id: Option, attempt_number: Option, process_id: Option, + host_boot_id: Option, + process_start_identity: Option, last_activity_unix_epoch: Option, last_protocol_activity_unix_epoch: Option, last_progress_unix_epoch: Option, @@ -1026,7 +1039,8 @@ pub(crate) fn write_run_operation_marker_for_process( let existing_marker = read_run_activity_marker_record(worktree_path)?; let mut marker = run_activity_marker_record_for_attempt(existing_marker.as_ref(), run_id, attempt_number); - marker.process_id = Some(process_id); + set_run_activity_marker_process_identity(&mut marker, process_id); + marker.last_activity_unix_epoch = Some(now); marker.last_progress_unix_epoch = Some(now); marker.current_operation = Some(current_operation.to_owned()); @@ -1066,7 +1080,7 @@ pub(crate) fn write_run_protocol_activity_marker( marker.run_id = Some(activity.run_id.to_owned()); marker.attempt_number = Some(activity.attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.last_activity_unix_epoch = Some(now); marker.last_protocol_activity_unix_epoch = Some(now); @@ -1095,7 +1109,7 @@ pub(crate) fn write_run_account_marker( marker.run_id = Some(account.run_id.to_owned()); marker.attempt_number = Some(account.attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.account = Some(account.account.clone()); @@ -1119,7 +1133,7 @@ pub(crate) fn write_run_thread_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.thread_id = Some(thread_id.to_owned()); @@ -1142,7 +1156,7 @@ pub(crate) fn write_run_turn_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.turn_id = Some(turn_id.to_owned()); @@ -1168,7 +1182,7 @@ pub(crate) fn write_run_thread_status_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.thread_id = thread_id.map(str::to_owned).or(marker.thread_id); @@ -1194,7 +1208,7 @@ pub(crate) fn write_run_effective_runtime_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.thread_id = runtime.thread_id.map(str::to_owned).or(marker.thread_id); @@ -1248,7 +1262,7 @@ pub(crate) fn write_run_retry_budget_attempt_count( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.retry_budget_attempt_count = Some(retry_budget_attempt_count); @@ -1307,7 +1321,7 @@ pub(crate) fn write_run_review_policy_state( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.review_policy_phase = Some(review_policy_phase.to_owned()); marker.review_policy_status = Some(review_policy_status.to_owned()); @@ -1345,13 +1359,15 @@ pub(crate) fn read_run_activity_marker_snapshot( Ok(read_run_activity_marker_record(worktree_path)?.and_then(|marker| { let accounts = accounts_from_marker_record(&marker); - Some(RunActivityMarker { - run_id: marker.run_id?, - attempt_number: marker.attempt_number?, - process_id: marker.process_id, - last_activity_unix_epoch: marker.last_activity_unix_epoch, - last_protocol_activity_unix_epoch: marker.last_protocol_activity_unix_epoch, - last_progress_unix_epoch: marker.last_progress_unix_epoch, + Some(RunActivityMarker { + run_id: marker.run_id?, + attempt_number: marker.attempt_number?, + process_id: marker.process_id, + host_boot_id: marker.host_boot_id, + process_start_identity: marker.process_start_identity, + last_activity_unix_epoch: marker.last_activity_unix_epoch, + last_protocol_activity_unix_epoch: marker.last_protocol_activity_unix_epoch, + last_progress_unix_epoch: marker.last_progress_unix_epoch, current_operation: marker.current_operation, thread_id: marker.thread_id, turn_id: marker.turn_id, @@ -1380,6 +1396,25 @@ pub(crate) fn read_run_activity_marker_snapshot( })) } +pub(crate) fn current_host_boot_id() -> Option { + static CURRENT_HOST_BOOT_ID: OnceLock> = OnceLock::new(); + + CURRENT_HOST_BOOT_ID.get_or_init(read_current_host_boot_id).clone() +} + +pub(crate) fn current_process_start_identity() -> Option { + static CURRENT_PROCESS_START_IDENTITY: OnceLock> = OnceLock::new(); + + CURRENT_PROCESS_START_IDENTITY + .get_or_init(|| process_start_identity(process::id())) + .clone() +} + +pub(crate) fn process_start_identity(process_id: u32) -> Option { + read_platform_process_start_identity(process_id) + .and_then(|identity| normalized_process_start_identity(&identity)) +} + fn normalize_accounts( selected: &CodexAccountActivitySummary, accounts: &[CodexAccountActivitySummary], @@ -1409,6 +1444,131 @@ fn accounts_from_marker_record( } } +fn set_run_activity_marker_process_identity( + marker: &mut RunActivityMarkerRecord, + process_id: u32, +) { + marker.process_id = Some(process_id); + marker.host_boot_id = current_host_boot_id(); + marker.process_start_identity = if process_id == process::id() { + current_process_start_identity() + } else { + process_start_identity(process_id) + }; +} + +fn ensure_run_activity_marker_current_process_identity(marker: &mut RunActivityMarkerRecord) { + let current_process_id = process::id(); + + match marker.process_id { + None => set_run_activity_marker_process_identity(marker, current_process_id), + Some(process_id) + if process_id == current_process_id + && (marker.host_boot_id.is_none() || marker.process_start_identity.is_none()) => + { + if marker.host_boot_id.is_none() { + marker.host_boot_id = current_host_boot_id(); + } + if marker.process_start_identity.is_none() { + marker.process_start_identity = current_process_start_identity(); + } + }, + Some(_) => {}, + } +} + +fn read_current_host_boot_id() -> Option { + read_platform_host_boot_id().and_then(|boot_id| normalized_host_boot_id(&boot_id)) +} + +#[cfg(target_os = "linux")] +fn read_platform_host_boot_id() -> Option { + fs::read_to_string("/proc/sys/kernel/random/boot_id") + .ok() + .map(|boot_id| format!("linux:{boot_id}")) +} + +#[cfg(target_os = "macos")] +fn read_platform_host_boot_id() -> Option { + let output = Command::new("/usr/sbin/sysctl") + .args(["-n", "kern.boottime"]) + .output() + .ok()?; + + if !output.status.success() { + return None; + } + + String::from_utf8(output.stdout) + .ok() + .map(|boot_id| format!("macos:{boot_id}")) +} + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +fn read_platform_host_boot_id() -> Option { + None +} + +fn normalized_host_boot_id(boot_id: &str) -> Option { + let normalized = boot_id.split_whitespace().collect::>().join(" "); + + (!normalized.is_empty()).then_some(normalized) +} + +#[cfg(target_os = "linux")] +fn read_platform_process_start_identity(process_id: u32) -> Option { + let stat = fs::read_to_string(format!("/proc/{process_id}/stat")).ok()?; + let comm_end = stat.rfind(')')?; + let after_comm = stat.get(comm_end + 2..)?; + let start_time = after_comm.split_whitespace().nth(19)?; + + Some(format!("linux_starttime:{start_time}")) +} + +#[cfg(target_os = "macos")] +fn read_platform_process_start_identity(process_id: u32) -> Option { + let Ok(pid) = i32::try_from(process_id) else { + return None; + }; + + if pid <= 0 { + return None; + } + + let mut info = MaybeUninit::::zeroed(); + let Ok(info_size) = i32::try_from(mem::size_of::()) else { + return None; + }; + let read_size = unsafe { + libc::proc_pidinfo( + pid, + PROC_PIDTBSDINFO, + 0, + info.as_mut_ptr().cast::(), + info_size, + ) + }; + + if read_size != info_size { + return None; + } + + let info = unsafe { info.assume_init() }; + + Some(format!("macos_starttime:{}:{}", info.pbi_start_tvsec, info.pbi_start_tvusec)) +} + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +fn read_platform_process_start_identity(_process_id: u32) -> Option { + None +} + +fn normalized_process_start_identity(identity: &str) -> Option { + let normalized = identity.split_whitespace().collect::>().join(" "); + + (!normalized.is_empty()).then_some(normalized) +} + fn persist_projects(transaction: &Transaction<'_>, state: &StateData) -> Result<()> { for project in state.projects.values() { transaction.execute( @@ -1702,7 +1862,8 @@ fn write_run_activity_marker_at( .filter(|marker| marker.run_id.as_deref() == Some(run_id) && marker.attempt_number == Some(attempt_number)); let mut marker = run_activity_marker_record_for_attempt(existing_marker.as_ref(), run_id, attempt_number); - marker.process_id = Some(process_id); + set_run_activity_marker_process_identity(&mut marker, process_id); + marker.last_activity_unix_epoch = Some(last_activity_unix_epoch); marker.last_protocol_activity_unix_epoch = last_protocol_activity_unix_epoch .or_else(|| same_run_marker.and_then(|marker| marker.last_protocol_activity_unix_epoch)); @@ -1726,12 +1887,15 @@ fn run_activity_marker_record_for_attempt( .filter(|marker| marker.run_id.as_deref() == Some(run_id) && marker.attempt_number == Some(attempt_number)); RunActivityMarkerRecord { - run_id: Some(run_id.to_owned()), - attempt_number: Some(attempt_number), - process_id: same_run_marker.and_then(|marker| marker.process_id), - last_activity_unix_epoch: same_run_marker.and_then(|marker| marker.last_activity_unix_epoch), - last_protocol_activity_unix_epoch: same_run_marker - .and_then(|marker| marker.last_protocol_activity_unix_epoch), + run_id: Some(run_id.to_owned()), + attempt_number: Some(attempt_number), + process_id: same_run_marker.and_then(|marker| marker.process_id), + host_boot_id: same_run_marker.and_then(|marker| marker.host_boot_id.clone()), + process_start_identity: same_run_marker + .and_then(|marker| marker.process_start_identity.clone()), + last_activity_unix_epoch: same_run_marker.and_then(|marker| marker.last_activity_unix_epoch), + last_protocol_activity_unix_epoch: same_run_marker + .and_then(|marker| marker.last_protocol_activity_unix_epoch), last_progress_unix_epoch: same_run_marker.and_then(|marker| marker.last_progress_unix_epoch), current_operation: same_run_marker.and_then(|marker| marker.current_operation.clone()), thread_id: same_run_marker.and_then(|marker| marker.thread_id.clone()), @@ -1786,10 +1950,12 @@ fn read_run_activity_marker_record( match key { "run_id" => marker.run_id = Some(value.to_owned()), - "attempt_number" => marker.attempt_number = value.parse::().ok(), - "process_id" => marker.process_id = value.parse::().ok(), - "last_activity_unix_epoch" => - marker.last_activity_unix_epoch = value.parse::().ok(), + "attempt_number" => marker.attempt_number = value.parse::().ok(), + "process_id" => marker.process_id = value.parse::().ok(), + "host_boot_id" => marker.host_boot_id = Some(value.to_owned()), + "process_start_identity" => marker.process_start_identity = Some(value.to_owned()), + "last_activity_unix_epoch" => + marker.last_activity_unix_epoch = value.parse::().ok(), "last_protocol_activity_unix_epoch" => marker.last_protocol_activity_unix_epoch = value.parse::().ok(), "last_progress_unix_epoch" => @@ -1860,6 +2026,12 @@ fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> Str if let Some(process_id) = marker.process_id { body.push_str(&format!("process_id={process_id}\n")); } + if let Some(host_boot_id) = &marker.host_boot_id { + body.push_str(&format!("host_boot_id={host_boot_id}\n")); + } + if let Some(process_start_identity) = &marker.process_start_identity { + body.push_str(&format!("process_start_identity={process_start_identity}\n")); + } if let Some(last_activity_unix_epoch) = marker.last_activity_unix_epoch { body.push_str(&format!("last_activity_unix_epoch={last_activity_unix_epoch}\n")); } @@ -1929,27 +2101,47 @@ fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> Str { body.push_str(&format!("protocol_activity={summary_json}\n")); } + + append_run_activity_marker_account_fields(&mut body, marker); + append_run_activity_marker_retry_fields(&mut body, marker); + append_run_activity_marker_review_policy_fields(&mut body, marker); + + body +} + +fn append_run_activity_marker_account_fields( + body: &mut String, + marker: &RunActivityMarkerRecord, +) { if let Some(account) = &marker.account && let Ok(summary_json) = serde_json::to_string(account) - { - body.push_str(&format!("account={summary_json}\n")); - } + { + body.push_str(&format!("account={summary_json}\n")); + } - if !marker.accounts.is_empty() - && let Ok(accounts_json) = serde_json::to_string(&marker.accounts) - { - body.push_str(&format!("accounts={accounts_json}\n")); - } + if !marker.accounts.is_empty() + && let Ok(accounts_json) = serde_json::to_string(&marker.accounts) + { + body.push_str(&format!("accounts={accounts_json}\n")); + } +} - if let Some(retry_budget_attempt_count) = marker.retry_budget_attempt_count { - body.push_str(&format!("retry_budget_attempt_count={retry_budget_attempt_count}\n")); - } +fn append_run_activity_marker_retry_fields(body: &mut String, marker: &RunActivityMarkerRecord) { + if let Some(retry_budget_attempt_count) = marker.retry_budget_attempt_count { + body.push_str(&format!("retry_budget_attempt_count={retry_budget_attempt_count}\n")); + } if let Some(retry_kind) = &marker.retry_kind { body.push_str(&format!("retry_kind={retry_kind}\n")); } if let Some(retry_ready_at_unix_epoch) = marker.retry_ready_at_unix_epoch { body.push_str(&format!("retry_ready_at_unix_epoch={retry_ready_at_unix_epoch}\n")); } +} + +fn append_run_activity_marker_review_policy_fields( + body: &mut String, + marker: &RunActivityMarkerRecord, +) { if let Some(review_policy_phase) = &marker.review_policy_phase { body.push_str(&format!("review_policy_phase={review_policy_phase}\n")); } @@ -1962,8 +2154,6 @@ fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> Str if let Some(review_policy_nonclean_rounds) = marker.review_policy_nonclean_rounds { body.push_str(&format!("review_policy_nonclean_rounds={review_policy_nonclean_rounds}\n")); } - - body } fn parse_marker_list(value: &str) -> Vec { diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index ce6385bf..b874cd58 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -388,6 +388,8 @@ pub(crate) struct RunActivityMarker { run_id: String, attempt_number: i64, process_id: Option, + host_boot_id: Option, + process_start_identity: Option, last_activity_unix_epoch: Option, last_protocol_activity_unix_epoch: Option, last_progress_unix_epoch: Option, @@ -429,6 +431,14 @@ impl RunActivityMarker { self.process_id } + pub(crate) fn host_boot_id(&self) -> Option<&str> { + self.host_boot_id.as_deref() + } + + pub(crate) fn process_start_identity(&self) -> Option<&str> { + self.process_start_identity.as_deref() + } + pub(crate) fn last_activity_unix_epoch(&self) -> Option { self.last_activity_unix_epoch } diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index b7024552..474088fd 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -1058,6 +1058,26 @@ fn assert_run_activity_marker_round_trips_clearable_auxiliary_fields() { assert_eq!(marker.run_id(), "run-1"); assert_eq!(marker.attempt_number(), 1); + + if let Some(host_boot_id) = state::current_host_boot_id() { + assert_eq!(marker.host_boot_id(), Some(host_boot_id.as_str())); + assert!( + fs::read_to_string(temp_dir.path().join(RUN_ACTIVITY_MARKER_FILE)) + .expect("activity marker body should load") + .contains(&format!("host_boot_id={host_boot_id}\n")), + "activity markers should record the host boot identity for reboot-safe liveness" + ); + } + if let Some(process_start_identity) = state::current_process_start_identity() { + assert_eq!(marker.process_start_identity(), Some(process_start_identity.as_str())); + assert!( + fs::read_to_string(temp_dir.path().join(RUN_ACTIVITY_MARKER_FILE)) + .expect("activity marker body should load") + .contains(&format!("process_start_identity={process_start_identity}\n")), + "activity markers should record the process start identity for PID-reuse-safe liveness" + ); + } + assert_eq!(marker.retry_kind(), Some("failure")); assert_eq!(marker.retry_ready_at_unix_epoch(), Some(12_345)); assert_eq!(marker.review_policy_phase(), Some("handoff")); diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index 721f74c7..88c0a280 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -63,7 +63,7 @@ Use `decodex diagnose --json` when an agent needs the current handoff index dire | Project `WORKFLOW.md` | repo policy, validation gate, state names, retry/review policy | runtime ownership, queue labels, credentials, model overrides | | Linear | team-visible issue state, queue/active/manual-attention labels, coarse execution ledger comments, progress/failure/handoff/closeout summaries | high-frequency runtime truth, heartbeat, token pressure, raw attempts, connector retry budgets | | GitHub | PR, checks, review comments, merge evidence, signed commit verification | queue selection or local lane ownership | -| `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt | durable ownership, review handoff identity, cleanup authority | +| `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt, including same-boot and same-process-start liveness | durable ownership, review handoff identity, cleanup authority | ## Operator Dashboard Sections @@ -124,9 +124,14 @@ outside the local operator surface. Worktree visibility follows the owning dashboard section: - `Running Lanes` means the runtime DB still has an active lease, active attempt, or - child process/thread/protocol relationship for the path. `active_lease` is queue - lease ownership only; `execution_liveness` explains why the lane is still visible - when the queue lease is not held. + child process/thread/protocol relationship for the path. Process liveness requires + an alive PID plus matching `.decodex-run-activity` `host_boot_id` and + `process_start_identity`; a previous-boot marker, same-boot PID reuse, missing + identity, or unavailable current host/process identity is recovery input, not proof + of active execution. `process_liveness_reason` explains which identity check failed + when `process_alive` is false. + `active_lease` is queue lease ownership only; `execution_liveness` explains why + the lane is still visible when the queue lease is not held. - Running lanes derive CLI and dashboard text from the same `OperatorRunStatus` object. `protocol_activity`, when present, summarizes app-server structured notifications for turn status, waiting reason, and recent protocol events. The @@ -210,7 +215,10 @@ map to an operator decision. snapshot fields. - Queue ownership and execution liveness are separate. `queue_lease_state` reports whether the local queue lease is held, while `execution_liveness` reports observed - process, app-server thread, or protocol activity. + process, app-server thread, or protocol activity. `process_alive` is true only + when `.decodex-run-activity` ties the PID to the current host boot identity and + current process start identity; `process_liveness_reason` keeps stopped process, + previous-boot, and PID-reuse cases distinguishable. - `status` is the operator-facing run status. If the raw attempt is still `starting` after app-server thread, model, or protocol evidence exists, `status` is shown as `running` and `attempt_status` preserves the raw persisted attempt value. diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 56bc56f6..abc88f59 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -313,7 +313,7 @@ The runtime database stores at least: - tracker and PR cache rows needed to survive connector outages - typed connector health and external API backoff -For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). +For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). Post-review ownership is stored in the runtime database. Retained handoff rows record the authoritative PR URL, branch lineage, validated PR head OID, run id, and attempt number that completed the `In Review` handoff. Retained orchestration rows record the current post-review phase for that exact handoff identity. If the matching database row is missing, post-review ownership must block as unresolved instead of rebinding from branch-name, current-head, Linear comments, or other heuristics. The only source-tree marker that clean-source checks may ignore is the untracked `.decodex-run-activity` heartbeat marker. Review handoff, orchestration, retry, phase timing, and retained PR state belong in the Decodex runtime database, not in root-level or worktree-local review marker files. @@ -367,7 +367,8 @@ After a process restart, recent-run history, active lease ownership, retained po - Retry recovery must bind retained lanes to issue identity and local runtime state rather than to Linear project membership. - While the control plane is running an active lane, every poll tick must refresh cached tracker state for the leased issue before considering any new selection. - While the control plane is running an active lane, that child must keep the workflow snapshot it started with; project `WORKFLOW.md` reloads affect later decisions without restarting the in-flight child. -- While the control plane is supervising an active child process, stall detection must consult the child-updated `.decodex-run-activity` marker for the current `run_id` plus `attempt_number` and the persisted runtime event journal. +- While the control plane is supervising an active child process, stall detection must consult the child-updated `.decodex-run-activity` marker for the current `run_id` plus `attempt_number` and the persisted runtime event journal. A retained marker only proves a live process when its PID is still alive on the current host boot and the process start identity still matches; after power loss, reboot, or same-boot PID reuse, recovery must clear the reconstructed lease and re-enter the retained lane through retry-style dispatch instead of preserving the old running state. +- Retry-style recovery prompts must tell the next agent to treat the current worktree, tracker state, protocol events, and marker files as durable truth, inspect the branch/diff/recent validation evidence first, and continue from partial work rather than assuming prior in-memory model/tool state survived. - While the control plane owns a queued retry entry, that queued claim must take priority over normal candidate selection for the affected project. - While the control plane is idle between lanes, it may reload the configured project `WORKFLOW.md` on each tick and immediately apply a newly valid document to future dispatch, retry, post-exit reconciliation, and prompt generation. - If that same configured `WORKFLOW.md` path becomes invalid after a successful load, the control plane must log the reload failure and keep the last known good document active instead of dropping the tick or clearing runtime policy.