From 02a8d8772edad14518107b28954031e007d7451a Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 19 May 2026 20:16:45 +0800 Subject: [PATCH 1/2] {"schema":"decodex/commit/1","summary":"Harden power-loss recovery","authority":"manual"} --- .../src/orchestrator/agent_evidence.rs | 10 +- .../src/orchestrator/operator_dashboard.html | 35 ++- apps/decodex/src/orchestrator/prompting.rs | 27 ++- apps/decodex/src/orchestrator/status.rs | 79 ++++++- apps/decodex/src/orchestrator/tests.rs | 51 +++++ .../tests/intake/run_and_prompting.rs | 48 ++++ .../tests/operator/status/running_lanes.rs | 87 ++++++++ .../tests/operator/status_support.rs | 9 +- .../tests/recovery/runtime_reentry.rs | 110 +++++++++ apps/decodex/src/orchestrator/types.rs | 2 + apps/decodex/src/state.rs | 2 +- apps/decodex/src/state/internal.rs | 211 +++++++++++++++--- apps/decodex/src/state/models.rs | 10 + apps/decodex/src/state/tests.rs | 18 ++ docs/reference/operator-control-plane.md | 18 +- docs/spec/runtime.md | 5 +- 16 files changed, 662 insertions(+), 60 deletions(-) diff --git a/apps/decodex/src/orchestrator/agent_evidence.rs b/apps/decodex/src/orchestrator/agent_evidence.rs index 423e0efa..1e825f46 100644 --- a/apps/decodex/src/orchestrator/agent_evidence.rs +++ b/apps/decodex/src/orchestrator/agent_evidence.rs @@ -149,6 +149,7 @@ struct AgentRunCapsule { interactive_requested: bool, process_id: Option, process_alive: Option, + process_liveness_reason: Option, event_count: i64, last_event_type: Option, last_event_at: Option, @@ -588,10 +589,11 @@ fn agent_run_capsule( turn_id: run.turn_id.clone(), thread_status: run.thread_status.clone(), thread_active_flags: run.thread_active_flags.clone(), - interactive_requested: run.interactive_requested, - process_id: run.process_id, - process_alive: run.process_alive, - event_count: run.event_count, + interactive_requested: run.interactive_requested, + process_id: run.process_id, + process_alive: run.process_alive, + process_liveness_reason: run.process_liveness_reason.clone(), + event_count: run.event_count, last_event_type: run.last_event_type.clone(), last_event_at: run.last_event_at.clone(), last_run_activity_at: run.last_run_activity_at.clone(), diff --git a/apps/decodex/src/orchestrator/operator_dashboard.html b/apps/decodex/src/orchestrator/operator_dashboard.html index 1c9a9067..88537416 100644 --- a/apps/decodex/src/orchestrator/operator_dashboard.html +++ b/apps/decodex/src/orchestrator/operator_dashboard.html @@ -4787,7 +4787,12 @@

Run History

facts.push(["Patch", "retained"]); } if (attention.process_alive != null) { - facts.push(["Process", attention.process_alive ? "alive" : "stopped"]); + facts.push([ + "Process", + attention.process_alive + ? "alive" + : processLivenessReasonLabel(attention.process_liveness_reason || "process_stopped"), + ]); } if (attention.worktree_path) { facts.push(["Worktree", attention.worktree_path]); @@ -5355,7 +5360,33 @@

Run History

if (run.process_alive == null) { return `${run.process_id} (unknown)`; } - return `${run.process_id} (${run.process_alive ? "alive" : "stopped"})`; + const reason = run.process_alive + ? "process_alive" + : run.process_liveness_reason || "process_stopped"; + return `${run.process_id} (${processLivenessReasonLabel(reason)})`; + } + + function processLivenessReasonLabel(reason) { + switch (reason) { + case "process_alive": + return "alive"; + case "process_stopped": + return "stopped"; + case "host_boot_id_missing": + return "boot identity missing"; + case "host_boot_id_unavailable": + return "boot identity unavailable"; + case "host_boot_id_mismatch": + return "previous boot"; + case "process_start_identity_missing": + return "start identity missing"; + case "process_start_identity_unavailable": + return "start identity unavailable"; + case "process_start_identity_mismatch": + return "process identity changed"; + default: + return reason ? reason.replaceAll("_", " ") : "unknown"; + } } function runExecutionLivenessSummary(run) { diff --git a/apps/decodex/src/orchestrator/prompting.rs b/apps/decodex/src/orchestrator/prompting.rs index 84124480..f36d58b5 100644 --- a/apps/decodex/src/orchestrator/prompting.rs +++ b/apps/decodex/src/orchestrator/prompting.rs @@ -1,6 +1,14 @@ const PROMPT_ONLY_INTERNAL_REVIEW_INSTRUCTION: &str = "Review your work repeatedly and fix any logic bugs until no new issues are found."; +fn build_retry_recovery_context(dispatch_mode: IssueDispatchMode) -> Option { + (dispatch_mode == IssueDispatchMode::Retry).then(|| { + String::from( + "Recovery context\n- This is retry-style re-entry after a prior attempt stopped or could not prove live execution.\n- Treat the current worktree, tracker state, protocol events, and marker files as the durable source of truth. Do not assume in-memory model output or tool results survived.\n- Before editing, inspect the current branch, diff, and recent validation evidence, reconcile partial work already present, and continue from that state instead of restarting from scratch.", + ) + }) +} + fn review_pull_request_title(issue: &TrackerIssue) -> String { let title = issue.title.trim(); let prefix = format!("{}:", issue.identifier); @@ -55,6 +63,9 @@ where sections.push(String::from( "Commit contract\n- When you create a local commit for this lane, use a single-line `decodex/commit/1` JSON commit message.\n- Required fields: `schema`, `summary`, and `authority`.\n- `authority` must be the authoritative Linear issue identifier for this lane.\n- Optional fields: `related` and `breaking`.\n- Do not encode landing mode, CI status, closeout state, or other process-state fields in the commit message.", )); + if let Some(recovery_context) = build_retry_recovery_context(issue_run.dispatch_mode) { + sections.push(recovery_context); + } let repair_architecture_guidance = build_external_repair_architecture_guidance(project, state_store, issue_run); @@ -150,6 +161,9 @@ where .tracker() .resolved_completed_state(); let internal_review_mode = project.codex().internal_review_mode(); + let recovery_context = build_retry_recovery_context(issue_run.dispatch_mode) + .map(|section| format!("{section}\n\n")) + .unwrap_or_default(); match issue_run.dispatch_mode { IssueDispatchMode::ReviewRepair => format!( @@ -188,12 +202,13 @@ where label_tool = ISSUE_LABEL_ADD_TOOL_NAME, continuation_guidance = continuation_guidance, ), - _ => format!( - "Resolve Linear issue {identifier}: {title}\n\nDescription:\n{description}\n\nExecution checklist:\n- Move the issue to `{in_progress}` with `{transition_tool}`. Decodex already records the run-start Linear ledger, so do not leave a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Keep discovery bounded to the minimal implementation files needed for this issue; defer broader docs or upstream reading unless a concrete ambiguity blocks the change.\n- Implement the fix in the current worktree.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- Run the repository validation needed to justify a reviewable PR.\n- Commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If the issue needs manual attention, add label `{needs_attention}` with `{label_tool}`, explain why in a comment, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`; `decodex` will finish that writeback after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}", - identifier = issue.identifier, - title = issue.title, - description = description, - transition_tool = ISSUE_TRANSITION_TOOL_NAME, + _ => format!( + "Resolve Linear issue {identifier}: {title}\n\nDescription:\n{description}\n\n{recovery_context}Execution checklist:\n- Move the issue to `{in_progress}` with `{transition_tool}`. Decodex already records the run-start Linear ledger, so do not leave a separate start comment.\n- Update `{progress_checkpoint_tool}` whenever the execution phase, focus, next action, blockers, evidence, or verification state changes materially.\n- Keep discovery bounded to the minimal implementation files needed for this issue; defer broader docs or upstream reading unless a concrete ambiguity blocks the change.\n- Implement the fix in the current worktree.\n{internal_review_guidance}- Treat failures from repo-native `canonicalize_commands`, `verify_commands`, or tracked rewrites left by that repo gate as continued repair by default: keep fixing the lane and rerun the gate instead of taking `manual_attention` unless the blocker is clearly toolchain, environment, or operator-owned.\n- Run the repository validation needed to justify a reviewable PR.\n- Commit the lane, push branch `{branch}`, and create or update a non-draft PR titled `{pr_title}` for that branch.\n{completion_guidance}- If the issue needs manual attention, add label `{needs_attention}` with `{label_tool}`, explain why in a comment, and then call `{terminal_finalize_tool}` with path `manual_attention`. Do not call `{review_handoff_tool}` in that case; `decodex` will stop the lane as a human-required failure without automatic retry.\n- Do not move the issue directly to `{success}` with `{transition_tool}`; `decodex` will finish that writeback after its own validation passes.\n- Do not report the run as complete or treat `{progress_checkpoint_tool}` as terminal completion until `{terminal_finalize_tool}` succeeds.{continuation_guidance}", + identifier = issue.identifier, + title = issue.title, + description = description, + recovery_context = recovery_context, + transition_tool = ISSUE_TRANSITION_TOOL_NAME, label_tool = ISSUE_LABEL_ADD_TOOL_NAME, progress_checkpoint_tool = ISSUE_PROGRESS_CHECKPOINT_TOOL_NAME, review_handoff_tool = ISSUE_REVIEW_HANDOFF_TOOL_NAME, diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 7d0c94d6..687d1cb8 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -61,6 +61,7 @@ impl PostReviewOrchestrationStatus { struct OperatorRunTiming { process_id: Option, process_alive: Option, + process_liveness_reason: Option, last_run_activity_unix_epoch: Option, last_protocol_activity_unix_epoch: Option, last_progress_unix_epoch: Option, @@ -68,6 +69,12 @@ struct OperatorRunTiming { protocol_idle_for_seconds: Option, } +#[derive(Clone, Copy)] +struct MarkerProcessLiveness { + alive: bool, + reason: &'static str, +} + struct OperatorRunAppServerState { thread_id: Option, turn_id: Option, @@ -1536,6 +1543,7 @@ where retry_budget_attempts, worktree_has_tracked_changes, ); + let process_liveness = marker.as_ref().and_then(marker_process_liveness_for_marker); Ok(Some(OperatorQueuedIssueAttentionStatus { summary, @@ -1572,7 +1580,8 @@ where .and_then(RunActivityMarker::last_event_type) .map(str::to_owned), event_count: marker.as_ref().map_or(0, RunActivityMarker::event_count), - process_alive: marker.as_ref().and_then(|marker| marker.process_id().map(process_is_alive)), + process_alive: process_liveness.map(|liveness| liveness.alive), + process_liveness_reason: process_liveness.map(|liveness| liveness.reason.to_owned()), worktree_path: worktree_path .exists() .then(|| relative_worktree_path_for_path(project, &worktree_path)), @@ -3274,13 +3283,61 @@ fn recoverable_worktree_identifiers(worktree_root: &Path) -> crate::prelude::Res } fn worktree_activity_marker_is_fresh(marker: &RunActivityMarker, now_unix_epoch: i64) -> bool { - marker.process_id().is_some_and(process_is_alive) + marker_process_is_alive(marker) && marker .last_activity_unix_epoch() .and_then(|last_activity| observed_idle_duration(last_activity, now_unix_epoch)) .is_some_and(|idle_for| idle_for < ACTIVE_RUN_IDLE_TIMEOUT) } +fn marker_process_is_alive(marker: &RunActivityMarker) -> bool { + marker_process_liveness(marker).alive +} + +fn marker_process_liveness_for_marker( + marker: &RunActivityMarker, +) -> Option { + marker.process_id().map(|_| marker_process_liveness(marker)) +} + +fn marker_process_liveness(marker: &RunActivityMarker) -> MarkerProcessLiveness { + let Some(process_id) = marker.process_id() else { + return MarkerProcessLiveness { alive: false, reason: "process_id_missing" }; + }; + + if !process_is_alive(process_id) { + return MarkerProcessLiveness { alive: false, reason: "process_stopped" }; + } + + let Some(marker_host_boot_id) = marker.host_boot_id() else { + return MarkerProcessLiveness { alive: false, reason: "host_boot_id_missing" }; + }; + let Some(current_host_boot_id) = state::current_host_boot_id() else { + return MarkerProcessLiveness { alive: false, reason: "host_boot_id_unavailable" }; + }; + if marker_host_boot_id != current_host_boot_id.as_str() { + return MarkerProcessLiveness { alive: false, reason: "host_boot_id_mismatch" }; + } + + let Some(marker_process_start_identity) = marker.process_start_identity() else { + return MarkerProcessLiveness { alive: false, reason: "process_start_identity_missing" }; + }; + let Some(current_process_start_identity) = state::process_start_identity(process_id) else { + return MarkerProcessLiveness { + alive: false, + reason: "process_start_identity_unavailable", + }; + }; + if marker_process_start_identity != current_process_start_identity.as_str() { + return MarkerProcessLiveness { + alive: false, + reason: "process_start_identity_mismatch", + }; + } + + MarkerProcessLiveness { alive: true, reason: "process_alive" } +} + fn process_is_alive(process_id: u32) -> bool { let Ok(process_id) = pid_t::try_from(process_id) else { return false; @@ -3409,11 +3466,12 @@ fn operator_run_status( suspected_stall, last_event_type: protocol_summary.last_event_type, last_event_at: protocol_summary.last_event_at, - event_count: protocol_summary.event_count, - process_id: timing.process_id, - process_alive: timing.process_alive, - retry_kind, - next_retry_at: format_optional_unix_timestamp(retry_ready_at_unix_epoch), + event_count: protocol_summary.event_count, + process_id: timing.process_id, + process_alive: timing.process_alive, + process_liveness_reason: timing.process_liveness_reason, + retry_kind, + next_retry_at: format_optional_unix_timestamp(retry_ready_at_unix_epoch), effective_model: app_server_state.effective_model, effective_model_provider: app_server_state.effective_model_provider, effective_cwd: app_server_state.effective_cwd, @@ -3460,9 +3518,11 @@ fn operator_run_timing( marker.and_then(RunActivityMarker::last_progress_unix_epoch), last_protocol_activity_unix_epoch, ); + let process_liveness = marker.and_then(marker_process_liveness_for_marker); OperatorRunTiming { - process_alive: process_id.map(process_is_alive), + process_alive: process_liveness.map(|liveness| liveness.alive), + process_liveness_reason: process_liveness.map(|liveness| liveness.reason.to_owned()), process_id, last_run_activity_unix_epoch, last_protocol_activity_unix_epoch, @@ -4696,7 +4756,7 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) { let accounts = render_accounts_summary(&run.accounts); output.push_str(&format!( - "- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n", + "- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n process_liveness_reason: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n", run.run_id, run.project_id, run.issue_id, @@ -4744,6 +4804,7 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) { || String::from("none"), |value| if value { String::from("yes") } else { String::from("no") }, ), + run.process_liveness_reason.as_deref().unwrap_or("none"), run.retry_kind.as_deref().unwrap_or("none"), run.next_retry_at.as_deref().unwrap_or("none"), run.effective_model.as_deref().unwrap_or("none"), diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index 61c7d551..eaaf580f 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -97,6 +97,57 @@ const TEST_NON_EXTERNAL_REVIEW_ACTOR_LOGIN: &str = "someone-else"; const TEST_SERVICE_ID: &str = "pubfi"; const TEST_PROJECT_CONFIG_FILE: &str = "project.toml"; +fn rewrite_run_activity_marker_host_boot_id(worktree_path: &Path, host_boot_id: &str) { + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); + let mut host_boot_id_written = false; + let mut rewritten = marker_body + .lines() + .map(|line| { + if line.starts_with("host_boot_id=") { + host_boot_id_written = true; + + format!("host_boot_id={host_boot_id}") + } else { + line.to_owned() + } + }) + .collect::>(); + + if !host_boot_id_written { + rewritten.push(format!("host_boot_id={host_boot_id}")); + } + + fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); +} + +fn rewrite_run_activity_marker_process_start_identity( + worktree_path: &Path, + process_start_identity: &str, +) { + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); + let mut process_start_identity_written = false; + let mut rewritten = marker_body + .lines() + .map(|line| { + if line.starts_with("process_start_identity=") { + process_start_identity_written = true; + + format!("process_start_identity={process_start_identity}") + } else { + line.to_owned() + } + }) + .collect::>(); + + if !process_start_identity_written { + rewritten.push(format!("process_start_identity={process_start_identity}")); + } + + fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); +} + struct FakeTracker { listed_issues: Vec, identifier_lookup_issues: Option>, diff --git a/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs b/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs index 74e59462..f5701897 100644 --- a/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs +++ b/apps/decodex/src/orchestrator/tests/intake/run_and_prompting.rs @@ -623,6 +623,54 @@ fn normal_prompts_require_issue_prefixed_pull_request_title() { assert!(developer_instructions.contains("single-line `decodex/commit/1` JSON commit message")); } +#[test] +fn retry_prompts_include_recovery_context() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_issue("In Progress", &[]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let issue_run = orchestrator::IssueRunPlan { + issue: issue.clone(), + issue_state: String::from("In Progress"), + initial_issue_state: String::from("In Progress"), + worktree: WorktreeSpec { + branch_name: String::from("x/pubfi-pub-101"), + issue_identifier: String::from("PUB-101"), + path: config.worktree_root().join("PUB-101"), + reused_existing: true, + }, + retry_project_slug: String::from("pubfi"), + dispatch_mode: IssueDispatchMode::Retry, + attempt_number: 2, + run_id: String::from("pub-101-attempt-2-123"), + retry_budget_base: 1, + }; + let state_store = StateStore::open_in_memory().expect("state store should open"); + let developer_instructions = orchestrator::build_developer_instructions( + &tracker, + &config, + &workflow, + &issue_run, + &state_store, + None, + ) + .expect("developer instructions should build"); + let user_input = orchestrator::build_user_input( + &tracker, + &config, + &issue, + &workflow, + &issue_run, + &state_store, + None, + ); + + for prompt in [&developer_instructions, &user_input] { + assert!(prompt.contains("Recovery context")); + assert!(prompt.contains("Treat the current worktree")); + assert!(prompt.contains("Do not assume in-memory model output or tool results survived")); + } +} + #[test] fn normal_prompts_respect_non_loop_internal_review_modes() { for (mode, expected, forbidden_checkpoint) in [ diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index 3576a8cc..43656821 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -771,6 +771,92 @@ fn operator_status_snapshot_counts_stopped_active_process_as_attention_not_runni assert_eq!(run.phase, "executing"); assert_eq!(run.process_alive, Some(false)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("process_stopped")); + assert_eq!(project.active_run_count, 0); + assert_eq!(project.attention_count, 1); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn operator_status_snapshot_counts_previous_boot_process_as_attention_not_running() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("Todo", &[]); + let worktree_path = config.worktree_root().join("PUB-101"); + + state_store + .record_run_attempt("run-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease("pubfi", &issue.id, "run-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, process::id()) + .expect("live process marker should write"); + rewrite_run_activity_marker_host_boot_id(&worktree_path, "previous-boot"); + + let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) + .expect("snapshot should build"); + let run = snapshot.active_runs.first().expect("active run should remain visible"); + let project = snapshot.projects.first().expect("project summary should exist"); + + assert_eq!(run.phase, "executing"); + assert_eq!(run.process_id, Some(process::id())); + assert_eq!(run.process_alive, Some(false)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("host_boot_id_mismatch")); + assert_eq!(project.active_run_count, 0); + assert_eq!(project.attention_count, 1); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn operator_status_snapshot_counts_reused_pid_as_attention_not_running() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("Todo", &[]); + let worktree_path = config.worktree_root().join("PUB-101"); + + state_store + .record_run_attempt("run-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease("pubfi", &issue.id, "run-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, process::id()) + .expect("live process marker should write"); + rewrite_run_activity_marker_process_start_identity(&worktree_path, "previous-process-start"); + + let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) + .expect("snapshot should build"); + let run = snapshot.active_runs.first().expect("active run should remain visible"); + let project = snapshot.projects.first().expect("project summary should exist"); + + assert_eq!(run.phase, "executing"); + assert_eq!(run.process_id, Some(process::id())); + assert_eq!(run.process_alive, Some(false)); + assert_eq!( + run.process_liveness_reason.as_deref(), + Some("process_start_identity_mismatch") + ); assert_eq!(project.active_run_count, 0); assert_eq!(project.attention_count, 1); } @@ -811,6 +897,7 @@ fn operator_status_snapshot_keeps_unleased_live_process_in_running_lanes() { assert_eq!(run.queue_lease_state, "not_held"); assert_eq!(run.execution_liveness, "process_alive"); assert_eq!(run.process_alive, Some(true)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("process_alive")); assert_eq!(project.active_run_count, 1); assert_eq!(project.retained_worktree_count, 0); } diff --git a/apps/decodex/src/orchestrator/tests/operator/status_support.rs b/apps/decodex/src/orchestrator/tests/operator/status_support.rs index 377c7ddc..1ac216f2 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status_support.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status_support.rs @@ -234,10 +234,11 @@ fn operator_status_text_active_run() -> orchestrator::OperatorRunStatus { suspected_stall: false, last_event_type: Some(String::from("turn/completed")), last_event_at: Some(String::from("2026-03-14 10:00:01")), - event_count: 4, - process_id: Some(1_234), - process_alive: Some(true), - retry_kind: None, + event_count: 4, + process_id: Some(1_234), + process_alive: Some(true), + process_liveness_reason: Some(String::from("process_alive")), + retry_kind: None, next_retry_at: None, effective_model: Some(String::from("gpt-5.4")), effective_model_provider: Some(String::from("openai")), diff --git a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs index 4fe2a3fa..e77acbab 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs @@ -423,6 +423,116 @@ fn run_project_once_skips_recovered_worktree_with_fresh_activity_marker() { ); } +#[cfg(unix)] +#[test] +fn run_project_once_retries_recovered_worktree_after_marker_process_is_killed() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_active_issue("In Progress"); + let tracker = FakeTracker::with_refresh_snapshots( + vec![issue.clone()], + vec![vec![issue.clone()], vec![issue.clone()]], + ); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("recovered worktree should exist"); + let mut child = process::Command::new("/bin/sh") + .args(["-c", "exec sleep 60"]) + .spawn() + .expect("kill-smoke child process should start"); + let child_process_id = child.id(); + + assert!( + orchestrator::process_is_alive(child_process_id), + "kill-smoke child process should be live before marker write" + ); + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, child_process_id) + .expect("activity marker should write"); + child.kill().expect("kill-smoke child process should be killed"); + child.wait().expect("kill-smoke child process should be reaped"); + assert!( + !orchestrator::process_is_alive(child_process_id), + "kill-smoke child process should no longer be live after kill" + ); + + let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) + .expect("kill-smoke recovery should succeed") + .expect("killed-process recovered lane should be selected for retry"); + + assert_eq!(summary.issue_id, issue.id); + assert_eq!(summary.dispatch_mode, orchestrator::IssueDispatchMode::Retry); + assert!( + state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none(), + "killed marker processes must not reconstruct live leases" + ); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn run_project_once_retries_recovered_worktree_from_previous_boot() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_active_issue("In Progress"); + let tracker = FakeTracker::with_refresh_snapshots( + vec![issue.clone()], + vec![vec![issue.clone()], vec![issue.clone()]], + ); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("recovered worktree should exist"); + + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, process::id()) + .expect("activity marker should write"); + rewrite_run_activity_marker_host_boot_id(&worktree.path, "previous-boot"); + + let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) + .expect("previous-boot recovery should succeed") + .expect("previous-boot recovered lane should be selected for retry"); + + assert_eq!(summary.issue_id, issue.id); + assert_eq!(summary.dispatch_mode, orchestrator::IssueDispatchMode::Retry); + assert!( + state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none(), + "previous-boot markers must not reconstruct live leases even when the PID exists" + ); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test] +fn run_project_once_retries_recovered_worktree_from_reused_pid() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_active_issue("In Progress"); + let tracker = FakeTracker::with_refresh_snapshots( + vec![issue.clone()], + vec![vec![issue.clone()], vec![issue.clone()]], + ); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("recovered worktree should exist"); + + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, process::id()) + .expect("activity marker should write"); + rewrite_run_activity_marker_process_start_identity(&worktree.path, "previous-process-start"); + + let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) + .expect("same-boot PID-reuse recovery should succeed") + .expect("same-boot PID-reuse recovered lane should be selected for retry"); + + assert_eq!(summary.issue_id, issue.id); + assert_eq!(summary.dispatch_mode, orchestrator::IssueDispatchMode::Retry); + assert!( + state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none(), + "PID reuse must not reconstruct live leases when the process start identity changed" + ); +} + #[cfg(unix)] #[test] fn process_is_alive_handles_current_process_and_invalid_sentinel() { diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index e6d906bc..7f0f5a75 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -783,6 +783,7 @@ struct OperatorRunStatus { event_count: i64, process_id: Option, process_alive: Option, + process_liveness_reason: Option, retry_kind: Option, next_retry_at: Option, effective_model: Option, @@ -832,6 +833,7 @@ struct OperatorQueuedIssueAttentionStatus { last_event_type: Option, event_count: i64, process_alive: Option, + process_liveness_reason: Option, worktree_path: Option, worktree_has_tracked_changes: bool, } diff --git a/apps/decodex/src/state.rs b/apps/decodex/src/state.rs index c034359e..6e2c5f47 100644 --- a/apps/decodex/src/state.rs +++ b/apps/decodex/src/state.rs @@ -8,7 +8,7 @@ use std::{ io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, process, - sync::{Mutex, MutexGuard}, + sync::{Mutex, MutexGuard, OnceLock}, time::Duration, }; diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 79e63f16..5e6e319f 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -936,6 +936,8 @@ struct RunActivityMarkerRecord { run_id: Option, attempt_number: Option, process_id: Option, + host_boot_id: Option, + process_start_identity: Option, last_activity_unix_epoch: Option, last_protocol_activity_unix_epoch: Option, last_progress_unix_epoch: Option, @@ -1026,7 +1028,7 @@ pub(crate) fn write_run_operation_marker_for_process( let existing_marker = read_run_activity_marker_record(worktree_path)?; let mut marker = run_activity_marker_record_for_attempt(existing_marker.as_ref(), run_id, attempt_number); - marker.process_id = Some(process_id); + set_run_activity_marker_process_identity(&mut marker, process_id); marker.last_activity_unix_epoch = Some(now); marker.last_progress_unix_epoch = Some(now); marker.current_operation = Some(current_operation.to_owned()); @@ -1066,7 +1068,7 @@ pub(crate) fn write_run_protocol_activity_marker( marker.run_id = Some(activity.run_id.to_owned()); marker.attempt_number = Some(activity.attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.last_activity_unix_epoch = Some(now); marker.last_protocol_activity_unix_epoch = Some(now); @@ -1095,7 +1097,7 @@ pub(crate) fn write_run_account_marker( marker.run_id = Some(account.run_id.to_owned()); marker.attempt_number = Some(account.attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.account = Some(account.account.clone()); @@ -1119,7 +1121,7 @@ pub(crate) fn write_run_thread_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.thread_id = Some(thread_id.to_owned()); @@ -1142,7 +1144,7 @@ pub(crate) fn write_run_turn_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.turn_id = Some(turn_id.to_owned()); @@ -1168,7 +1170,7 @@ pub(crate) fn write_run_thread_status_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.thread_id = thread_id.map(str::to_owned).or(marker.thread_id); @@ -1194,7 +1196,7 @@ pub(crate) fn write_run_effective_runtime_marker( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.current_operation = Some(RUN_OPERATION_AGENT_RUN.to_owned()); marker.thread_id = runtime.thread_id.map(str::to_owned).or(marker.thread_id); @@ -1248,7 +1250,7 @@ pub(crate) fn write_run_retry_budget_attempt_count( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.retry_budget_attempt_count = Some(retry_budget_attempt_count); @@ -1307,7 +1309,7 @@ pub(crate) fn write_run_review_policy_state( marker.run_id = Some(run_id.to_owned()); marker.attempt_number = Some(attempt_number); - marker.process_id.get_or_insert_with(process::id); + ensure_run_activity_marker_current_process_identity(&mut marker); marker.review_policy_phase = Some(review_policy_phase.to_owned()); marker.review_policy_status = Some(review_policy_status.to_owned()); @@ -1345,13 +1347,15 @@ pub(crate) fn read_run_activity_marker_snapshot( Ok(read_run_activity_marker_record(worktree_path)?.and_then(|marker| { let accounts = accounts_from_marker_record(&marker); - Some(RunActivityMarker { - run_id: marker.run_id?, - attempt_number: marker.attempt_number?, - process_id: marker.process_id, - last_activity_unix_epoch: marker.last_activity_unix_epoch, - last_protocol_activity_unix_epoch: marker.last_protocol_activity_unix_epoch, - last_progress_unix_epoch: marker.last_progress_unix_epoch, + Some(RunActivityMarker { + run_id: marker.run_id?, + attempt_number: marker.attempt_number?, + process_id: marker.process_id, + host_boot_id: marker.host_boot_id, + process_start_identity: marker.process_start_identity, + last_activity_unix_epoch: marker.last_activity_unix_epoch, + last_protocol_activity_unix_epoch: marker.last_protocol_activity_unix_epoch, + last_progress_unix_epoch: marker.last_progress_unix_epoch, current_operation: marker.current_operation, thread_id: marker.thread_id, turn_id: marker.turn_id, @@ -1409,6 +1413,148 @@ fn accounts_from_marker_record( } } +pub(crate) fn current_host_boot_id() -> Option { + static CURRENT_HOST_BOOT_ID: OnceLock> = OnceLock::new(); + + CURRENT_HOST_BOOT_ID.get_or_init(read_current_host_boot_id).clone() +} + +pub(crate) fn current_process_start_identity() -> Option { + static CURRENT_PROCESS_START_IDENTITY: OnceLock> = OnceLock::new(); + + CURRENT_PROCESS_START_IDENTITY + .get_or_init(|| process_start_identity(process::id())) + .clone() +} + +pub(crate) fn process_start_identity(process_id: u32) -> Option { + read_platform_process_start_identity(process_id) + .and_then(|identity| normalized_process_start_identity(&identity)) +} + +fn set_run_activity_marker_process_identity( + marker: &mut RunActivityMarkerRecord, + process_id: u32, +) { + marker.process_id = Some(process_id); + marker.host_boot_id = current_host_boot_id(); + marker.process_start_identity = if process_id == process::id() { + current_process_start_identity() + } else { + process_start_identity(process_id) + }; +} + +fn ensure_run_activity_marker_current_process_identity(marker: &mut RunActivityMarkerRecord) { + let current_process_id = process::id(); + + match marker.process_id { + None => set_run_activity_marker_process_identity(marker, current_process_id), + Some(process_id) + if process_id == current_process_id + && (marker.host_boot_id.is_none() || marker.process_start_identity.is_none()) => + { + if marker.host_boot_id.is_none() { + marker.host_boot_id = current_host_boot_id(); + } + if marker.process_start_identity.is_none() { + marker.process_start_identity = current_process_start_identity(); + } + }, + Some(_) => {}, + } +} + +fn read_current_host_boot_id() -> Option { + read_platform_host_boot_id().and_then(|boot_id| normalized_host_boot_id(&boot_id)) +} + +#[cfg(target_os = "linux")] +fn read_platform_host_boot_id() -> Option { + fs::read_to_string("/proc/sys/kernel/random/boot_id") + .ok() + .map(|boot_id| format!("linux:{boot_id}")) +} + +#[cfg(target_os = "macos")] +fn read_platform_host_boot_id() -> Option { + let output = process::Command::new("/usr/sbin/sysctl") + .args(["-n", "kern.boottime"]) + .output() + .ok()?; + + if !output.status.success() { + return None; + } + + String::from_utf8(output.stdout) + .ok() + .map(|boot_id| format!("macos:{boot_id}")) +} + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +fn read_platform_host_boot_id() -> Option { + None +} + +fn normalized_host_boot_id(boot_id: &str) -> Option { + let normalized = boot_id.split_whitespace().collect::>().join(" "); + + (!normalized.is_empty()).then_some(normalized) +} + +#[cfg(target_os = "linux")] +fn read_platform_process_start_identity(process_id: u32) -> Option { + let stat = fs::read_to_string(format!("/proc/{process_id}/stat")).ok()?; + let comm_end = stat.rfind(')')?; + let after_comm = stat.get(comm_end + 2..)?; + let start_time = after_comm.split_whitespace().nth(19)?; + + Some(format!("linux_starttime:{start_time}")) +} + +#[cfg(target_os = "macos")] +fn read_platform_process_start_identity(process_id: u32) -> Option { + let Ok(pid) = i32::try_from(process_id) else { + return None; + }; + if pid <= 0 { + return None; + } + + let mut info = std::mem::MaybeUninit::::zeroed(); + let Ok(info_size) = i32::try_from(std::mem::size_of::()) else { + return None; + }; + let read_size = unsafe { + libc::proc_pidinfo( + pid, + libc::PROC_PIDTBSDINFO, + 0, + info.as_mut_ptr().cast::(), + info_size, + ) + }; + if read_size != info_size { + return None; + } + + let info = unsafe { info.assume_init() }; + + Some(format!("macos_starttime:{}:{}", info.pbi_start_tvsec, info.pbi_start_tvusec)) +} + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +fn read_platform_process_start_identity(_process_id: u32) -> Option { + None +} + +fn normalized_process_start_identity(identity: &str) -> Option { + let normalized = identity.split_whitespace().collect::>().join(" "); + + (!normalized.is_empty()).then_some(normalized) +} + fn persist_projects(transaction: &Transaction<'_>, state: &StateData) -> Result<()> { for project in state.projects.values() { transaction.execute( @@ -1702,7 +1848,7 @@ fn write_run_activity_marker_at( .filter(|marker| marker.run_id.as_deref() == Some(run_id) && marker.attempt_number == Some(attempt_number)); let mut marker = run_activity_marker_record_for_attempt(existing_marker.as_ref(), run_id, attempt_number); - marker.process_id = Some(process_id); + set_run_activity_marker_process_identity(&mut marker, process_id); marker.last_activity_unix_epoch = Some(last_activity_unix_epoch); marker.last_protocol_activity_unix_epoch = last_protocol_activity_unix_epoch .or_else(|| same_run_marker.and_then(|marker| marker.last_protocol_activity_unix_epoch)); @@ -1726,12 +1872,15 @@ fn run_activity_marker_record_for_attempt( .filter(|marker| marker.run_id.as_deref() == Some(run_id) && marker.attempt_number == Some(attempt_number)); RunActivityMarkerRecord { - run_id: Some(run_id.to_owned()), - attempt_number: Some(attempt_number), - process_id: same_run_marker.and_then(|marker| marker.process_id), - last_activity_unix_epoch: same_run_marker.and_then(|marker| marker.last_activity_unix_epoch), - last_protocol_activity_unix_epoch: same_run_marker - .and_then(|marker| marker.last_protocol_activity_unix_epoch), + run_id: Some(run_id.to_owned()), + attempt_number: Some(attempt_number), + process_id: same_run_marker.and_then(|marker| marker.process_id), + host_boot_id: same_run_marker.and_then(|marker| marker.host_boot_id.clone()), + process_start_identity: same_run_marker + .and_then(|marker| marker.process_start_identity.clone()), + last_activity_unix_epoch: same_run_marker.and_then(|marker| marker.last_activity_unix_epoch), + last_protocol_activity_unix_epoch: same_run_marker + .and_then(|marker| marker.last_protocol_activity_unix_epoch), last_progress_unix_epoch: same_run_marker.and_then(|marker| marker.last_progress_unix_epoch), current_operation: same_run_marker.and_then(|marker| marker.current_operation.clone()), thread_id: same_run_marker.and_then(|marker| marker.thread_id.clone()), @@ -1786,10 +1935,12 @@ fn read_run_activity_marker_record( match key { "run_id" => marker.run_id = Some(value.to_owned()), - "attempt_number" => marker.attempt_number = value.parse::().ok(), - "process_id" => marker.process_id = value.parse::().ok(), - "last_activity_unix_epoch" => - marker.last_activity_unix_epoch = value.parse::().ok(), + "attempt_number" => marker.attempt_number = value.parse::().ok(), + "process_id" => marker.process_id = value.parse::().ok(), + "host_boot_id" => marker.host_boot_id = Some(value.to_owned()), + "process_start_identity" => marker.process_start_identity = Some(value.to_owned()), + "last_activity_unix_epoch" => + marker.last_activity_unix_epoch = value.parse::().ok(), "last_protocol_activity_unix_epoch" => marker.last_protocol_activity_unix_epoch = value.parse::().ok(), "last_progress_unix_epoch" => @@ -1860,6 +2011,12 @@ fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> Str if let Some(process_id) = marker.process_id { body.push_str(&format!("process_id={process_id}\n")); } + if let Some(host_boot_id) = &marker.host_boot_id { + body.push_str(&format!("host_boot_id={host_boot_id}\n")); + } + if let Some(process_start_identity) = &marker.process_start_identity { + body.push_str(&format!("process_start_identity={process_start_identity}\n")); + } if let Some(last_activity_unix_epoch) = marker.last_activity_unix_epoch { body.push_str(&format!("last_activity_unix_epoch={last_activity_unix_epoch}\n")); } diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index ce6385bf..b874cd58 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -388,6 +388,8 @@ pub(crate) struct RunActivityMarker { run_id: String, attempt_number: i64, process_id: Option, + host_boot_id: Option, + process_start_identity: Option, last_activity_unix_epoch: Option, last_protocol_activity_unix_epoch: Option, last_progress_unix_epoch: Option, @@ -429,6 +431,14 @@ impl RunActivityMarker { self.process_id } + pub(crate) fn host_boot_id(&self) -> Option<&str> { + self.host_boot_id.as_deref() + } + + pub(crate) fn process_start_identity(&self) -> Option<&str> { + self.process_start_identity.as_deref() + } + pub(crate) fn last_activity_unix_epoch(&self) -> Option { self.last_activity_unix_epoch } diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index b7024552..3527155e 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -1058,6 +1058,24 @@ fn assert_run_activity_marker_round_trips_clearable_auxiliary_fields() { assert_eq!(marker.run_id(), "run-1"); assert_eq!(marker.attempt_number(), 1); + if let Some(host_boot_id) = state::current_host_boot_id() { + assert_eq!(marker.host_boot_id(), Some(host_boot_id.as_str())); + assert!( + fs::read_to_string(temp_dir.path().join(RUN_ACTIVITY_MARKER_FILE)) + .expect("activity marker body should load") + .contains(&format!("host_boot_id={host_boot_id}\n")), + "activity markers should record the host boot identity for reboot-safe liveness" + ); + } + if let Some(process_start_identity) = state::current_process_start_identity() { + assert_eq!(marker.process_start_identity(), Some(process_start_identity.as_str())); + assert!( + fs::read_to_string(temp_dir.path().join(RUN_ACTIVITY_MARKER_FILE)) + .expect("activity marker body should load") + .contains(&format!("process_start_identity={process_start_identity}\n")), + "activity markers should record the process start identity for PID-reuse-safe liveness" + ); + } assert_eq!(marker.retry_kind(), Some("failure")); assert_eq!(marker.retry_ready_at_unix_epoch(), Some(12_345)); assert_eq!(marker.review_policy_phase(), Some("handoff")); diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index 721f74c7..88c0a280 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -63,7 +63,7 @@ Use `decodex diagnose --json` when an agent needs the current handoff index dire | Project `WORKFLOW.md` | repo policy, validation gate, state names, retry/review policy | runtime ownership, queue labels, credentials, model overrides | | Linear | team-visible issue state, queue/active/manual-attention labels, coarse execution ledger comments, progress/failure/handoff/closeout summaries | high-frequency runtime truth, heartbeat, token pressure, raw attempts, connector retry budgets | | GitHub | PR, checks, review comments, merge evidence, signed commit verification | queue selection or local lane ownership | -| `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt | durable ownership, review handoff identity, cleanup authority | +| `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt, including same-boot and same-process-start liveness | durable ownership, review handoff identity, cleanup authority | ## Operator Dashboard Sections @@ -124,9 +124,14 @@ outside the local operator surface. Worktree visibility follows the owning dashboard section: - `Running Lanes` means the runtime DB still has an active lease, active attempt, or - child process/thread/protocol relationship for the path. `active_lease` is queue - lease ownership only; `execution_liveness` explains why the lane is still visible - when the queue lease is not held. + child process/thread/protocol relationship for the path. Process liveness requires + an alive PID plus matching `.decodex-run-activity` `host_boot_id` and + `process_start_identity`; a previous-boot marker, same-boot PID reuse, missing + identity, or unavailable current host/process identity is recovery input, not proof + of active execution. `process_liveness_reason` explains which identity check failed + when `process_alive` is false. + `active_lease` is queue lease ownership only; `execution_liveness` explains why + the lane is still visible when the queue lease is not held. - Running lanes derive CLI and dashboard text from the same `OperatorRunStatus` object. `protocol_activity`, when present, summarizes app-server structured notifications for turn status, waiting reason, and recent protocol events. The @@ -210,7 +215,10 @@ map to an operator decision. snapshot fields. - Queue ownership and execution liveness are separate. `queue_lease_state` reports whether the local queue lease is held, while `execution_liveness` reports observed - process, app-server thread, or protocol activity. + process, app-server thread, or protocol activity. `process_alive` is true only + when `.decodex-run-activity` ties the PID to the current host boot identity and + current process start identity; `process_liveness_reason` keeps stopped process, + previous-boot, and PID-reuse cases distinguishable. - `status` is the operator-facing run status. If the raw attempt is still `starting` after app-server thread, model, or protocol evidence exists, `status` is shown as `running` and `attempt_status` preserves the raw persisted attempt value. diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 56bc56f6..abc88f59 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -313,7 +313,7 @@ The runtime database stores at least: - tracker and PR cache rows needed to survive connector outages - typed connector health and external API backoff -For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). +For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). Post-review ownership is stored in the runtime database. Retained handoff rows record the authoritative PR URL, branch lineage, validated PR head OID, run id, and attempt number that completed the `In Review` handoff. Retained orchestration rows record the current post-review phase for that exact handoff identity. If the matching database row is missing, post-review ownership must block as unresolved instead of rebinding from branch-name, current-head, Linear comments, or other heuristics. The only source-tree marker that clean-source checks may ignore is the untracked `.decodex-run-activity` heartbeat marker. Review handoff, orchestration, retry, phase timing, and retained PR state belong in the Decodex runtime database, not in root-level or worktree-local review marker files. @@ -367,7 +367,8 @@ After a process restart, recent-run history, active lease ownership, retained po - Retry recovery must bind retained lanes to issue identity and local runtime state rather than to Linear project membership. - While the control plane is running an active lane, every poll tick must refresh cached tracker state for the leased issue before considering any new selection. - While the control plane is running an active lane, that child must keep the workflow snapshot it started with; project `WORKFLOW.md` reloads affect later decisions without restarting the in-flight child. -- While the control plane is supervising an active child process, stall detection must consult the child-updated `.decodex-run-activity` marker for the current `run_id` plus `attempt_number` and the persisted runtime event journal. +- While the control plane is supervising an active child process, stall detection must consult the child-updated `.decodex-run-activity` marker for the current `run_id` plus `attempt_number` and the persisted runtime event journal. A retained marker only proves a live process when its PID is still alive on the current host boot and the process start identity still matches; after power loss, reboot, or same-boot PID reuse, recovery must clear the reconstructed lease and re-enter the retained lane through retry-style dispatch instead of preserving the old running state. +- Retry-style recovery prompts must tell the next agent to treat the current worktree, tracker state, protocol events, and marker files as durable truth, inspect the branch/diff/recent validation evidence first, and continue from partial work rather than assuming prior in-memory model/tool state survived. - While the control plane owns a queued retry entry, that queued claim must take priority over normal candidate selection for the affected project. - While the control plane is idle between lanes, it may reload the configured project `WORKFLOW.md` on each tick and immediately apply a newly valid document to future dispatch, retry, post-exit reconciliation, and prompt generation. - If that same configured `WORKFLOW.md` path becomes invalid after a successful load, the control plane must log the reload failure and keep the last known good document active instead of dropping the tick or clearing runtime policy. From 01ed9824f4950134103bec8a2143ea20d0efd075 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 19 May 2026 20:28:15 +0800 Subject: [PATCH 2/2] {"schema":"decodex/commit/1","summary":"Satisfy Rust style gate","authority":"manual"} --- apps/decodex/src/orchestrator/prompting.rs | 1 + apps/decodex/src/orchestrator/status.rs | 2 + apps/decodex/src/orchestrator/tests.rs | 132 +++++++++++------- .../tests/operator/status/running_lanes.rs | 2 + .../tests/recovery/runtime_reentry.rs | 7 +- apps/decodex/src/state/internal.rs | 107 +++++++++----- apps/decodex/src/state/tests.rs | 2 + 7 files changed, 164 insertions(+), 89 deletions(-) diff --git a/apps/decodex/src/orchestrator/prompting.rs b/apps/decodex/src/orchestrator/prompting.rs index f36d58b5..3dc2dcfd 100644 --- a/apps/decodex/src/orchestrator/prompting.rs +++ b/apps/decodex/src/orchestrator/prompting.rs @@ -63,6 +63,7 @@ where sections.push(String::from( "Commit contract\n- When you create a local commit for this lane, use a single-line `decodex/commit/1` JSON commit message.\n- Required fields: `schema`, `summary`, and `authority`.\n- `authority` must be the authoritative Linear issue identifier for this lane.\n- Optional fields: `related` and `breaking`.\n- Do not encode landing mode, CI status, closeout state, or other process-state fields in the commit message.", )); + if let Some(recovery_context) = build_retry_recovery_context(issue_run.dispatch_mode) { sections.push(recovery_context); } diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 687d1cb8..a9db0614 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -3315,6 +3315,7 @@ fn marker_process_liveness(marker: &RunActivityMarker) -> MarkerProcessLiveness let Some(current_host_boot_id) = state::current_host_boot_id() else { return MarkerProcessLiveness { alive: false, reason: "host_boot_id_unavailable" }; }; + if marker_host_boot_id != current_host_boot_id.as_str() { return MarkerProcessLiveness { alive: false, reason: "host_boot_id_mismatch" }; } @@ -3328,6 +3329,7 @@ fn marker_process_liveness(marker: &RunActivityMarker) -> MarkerProcessLiveness reason: "process_start_identity_unavailable", }; }; + if marker_process_start_identity != current_process_start_identity.as_str() { return MarkerProcessLiveness { alive: false, diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index eaaf580f..794ee589 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -53,41 +53,71 @@ use crate::orchestrator::{ReviewHandoffMarker, ReviewOrchestrationMarker}; // Workflow reload, intake eligibility, prompting, and candidate selection. include!("tests/intake/workflow_reload.rs"); + include!("tests/intake/eligibility.rs"); + include!("tests/intake/run_and_prompting.rs"); + include!("tests/intake/prepare_issue_run.rs"); + include!("tests/intake/candidate_selection.rs"); // Retry scheduling, runtime failure classes, and recovery cleanup. include!("tests/retry/scheduling.rs"); + include!("tests/retry/selection.rs"); + include!("tests/runtime/repo_gate.rs"); + include!("tests/runtime/failure.rs"); + include!("tests/recovery/reconciliation.rs"); + include!("tests/recovery/terminal_support.rs"); + include!("tests/recovery/closeout/dispatch.rs"); + include!("tests/recovery/closeout/identity.rs"); + include!("tests/recovery/closeout/cleanup.rs"); + include!("tests/recovery/terminal_failures.rs"); + include!("tests/recovery/runtime_reentry.rs"); // Operator status plus retained post-review review/landing behavior. include!("tests/operator/status_support.rs"); + include!("tests/operator/status/control_plane.rs"); + include!("tests/operator/status/running_lanes.rs"); + include!("tests/operator/status/history.rs"); + include!("tests/operator/status/text.rs"); + include!("tests/operator/status/publishing.rs"); + include!("tests/operator/status/queue.rs"); + include!("tests/operator/status/agent_evidence.rs"); + include!("tests/operator/status/http.rs"); + include!("tests/operator/status/dashboard.rs"); + include!("tests/review_landing/status_support.rs"); + include!("tests/review_landing/status_rows.rs"); + include!("tests/review_landing/orchestration.rs"); + include!("tests/review_landing/status_markers.rs"); + include!("tests/review_landing/classification_review.rs"); + include!("tests/review_landing/classification_checks.rs"); + include!("tests/review_landing/review_state.rs"); const TEST_EXTERNAL_REVIEW_REQUEST_COMMENT_ID: i64 = 991; @@ -97,57 +127,6 @@ const TEST_NON_EXTERNAL_REVIEW_ACTOR_LOGIN: &str = "someone-else"; const TEST_SERVICE_ID: &str = "pubfi"; const TEST_PROJECT_CONFIG_FILE: &str = "project.toml"; -fn rewrite_run_activity_marker_host_boot_id(worktree_path: &Path, host_boot_id: &str) { - let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); - let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); - let mut host_boot_id_written = false; - let mut rewritten = marker_body - .lines() - .map(|line| { - if line.starts_with("host_boot_id=") { - host_boot_id_written = true; - - format!("host_boot_id={host_boot_id}") - } else { - line.to_owned() - } - }) - .collect::>(); - - if !host_boot_id_written { - rewritten.push(format!("host_boot_id={host_boot_id}")); - } - - fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); -} - -fn rewrite_run_activity_marker_process_start_identity( - worktree_path: &Path, - process_start_identity: &str, -) { - let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); - let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); - let mut process_start_identity_written = false; - let mut rewritten = marker_body - .lines() - .map(|line| { - if line.starts_with("process_start_identity=") { - process_start_identity_written = true; - - format!("process_start_identity={process_start_identity}") - } else { - line.to_owned() - } - }) - .collect::>(); - - if !process_start_identity_written { - rewritten.push(format!("process_start_identity={process_start_identity}")); - } - - fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); -} - struct FakeTracker { listed_issues: Vec, identifier_lookup_issues: Option>, @@ -358,6 +337,57 @@ impl PullRequestReviewStateInspector for FakePullRequestReviewStateInspector { } } +fn rewrite_run_activity_marker_host_boot_id(worktree_path: &Path, host_boot_id: &str) { + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); + let mut host_boot_id_written = false; + let mut rewritten = marker_body + .lines() + .map(|line| { + if line.starts_with("host_boot_id=") { + host_boot_id_written = true; + + format!("host_boot_id={host_boot_id}") + } else { + line.to_owned() + } + }) + .collect::>(); + + if !host_boot_id_written { + rewritten.push(format!("host_boot_id={host_boot_id}")); + } + + fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); +} + +fn rewrite_run_activity_marker_process_start_identity( + worktree_path: &Path, + process_start_identity: &str, +) { + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + let marker_body = fs::read_to_string(&marker_path).expect("marker body should load"); + let mut process_start_identity_written = false; + let mut rewritten = marker_body + .lines() + .map(|line| { + if line.starts_with("process_start_identity=") { + process_start_identity_written = true; + + format!("process_start_identity={process_start_identity}") + } else { + line.to_owned() + } + }) + .collect::>(); + + if !process_start_identity_written { + rewritten.push(format!("process_start_identity={process_start_identity}")); + } + + fs::write(&marker_path, rewritten.join("\n") + "\n").expect("marker body should rewrite"); +} + fn install_fake_post_issue_comment_gh_response( temp_dir: &TempDir, comment_id: i64, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index 43656821..bdd3ec6f 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -802,6 +802,7 @@ fn operator_status_snapshot_counts_previous_boot_process_as_attention_not_runnin fs::create_dir_all(&worktree_path).expect("worktree path should exist"); state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, process::id()) .expect("live process marker should write"); + rewrite_run_activity_marker_host_boot_id(&worktree_path, "previous-boot"); let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) @@ -843,6 +844,7 @@ fn operator_status_snapshot_counts_reused_pid_as_attention_not_running() { fs::create_dir_all(&worktree_path).expect("worktree path should exist"); state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, process::id()) .expect("live process marker should write"); + rewrite_run_activity_marker_process_start_identity(&worktree_path, "previous-process-start"); let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) diff --git a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs index e77acbab..e8b721e1 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs @@ -438,7 +438,7 @@ fn run_project_once_retries_recovered_worktree_after_marker_process_is_killed() let worktree = worktree_manager .ensure_worktree(&issue.identifier, false) .expect("recovered worktree should exist"); - let mut child = process::Command::new("/bin/sh") + let mut child = Command::new("/bin/sh") .args(["-c", "exec sleep 60"]) .spawn() .expect("kill-smoke child process should start"); @@ -448,10 +448,13 @@ fn run_project_once_retries_recovered_worktree_after_marker_process_is_killed() orchestrator::process_is_alive(child_process_id), "kill-smoke child process should be live before marker write" ); + state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, child_process_id) .expect("activity marker should write"); + child.kill().expect("kill-smoke child process should be killed"); child.wait().expect("kill-smoke child process should be reaped"); + assert!( !orchestrator::process_is_alive(child_process_id), "kill-smoke child process should no longer be live after kill" @@ -487,6 +490,7 @@ fn run_project_once_retries_recovered_worktree_from_previous_boot() { state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, process::id()) .expect("activity marker should write"); + rewrite_run_activity_marker_host_boot_id(&worktree.path, "previous-boot"); let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) @@ -519,6 +523,7 @@ fn run_project_once_retries_recovered_worktree_from_reused_pid() { state::write_run_activity_marker_for_process(&worktree.path, "run-1", 1, process::id()) .expect("activity marker should write"); + rewrite_run_activity_marker_process_start_identity(&worktree.path, "previous-process-start"); let summary = orchestrator::run_project_once(&tracker, &config, &workflow, &state_store, true) diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 5e6e319f..b60da18f 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -1,6 +1,17 @@ +#[cfg(target_os = "macos")] +use std::mem::MaybeUninit; + use libc::FD_CLOEXEC; use libc::F_GETFD; use libc::F_SETFD; +#[cfg(target_os = "macos")] +use libc::{ + c_void, + proc_bsdinfo, + PROC_PIDTBSDINFO, +}; +#[cfg(target_os = "macos")] +use process::Command; pub(crate) struct EffectiveRuntimeMarker<'a> { pub(crate) thread_id: Option<&'a str>, @@ -1029,6 +1040,7 @@ pub(crate) fn write_run_operation_marker_for_process( let mut marker = run_activity_marker_record_for_attempt(existing_marker.as_ref(), run_id, attempt_number); set_run_activity_marker_process_identity(&mut marker, process_id); + marker.last_activity_unix_epoch = Some(now); marker.last_progress_unix_epoch = Some(now); marker.current_operation = Some(current_operation.to_owned()); @@ -1384,6 +1396,25 @@ pub(crate) fn read_run_activity_marker_snapshot( })) } +pub(crate) fn current_host_boot_id() -> Option { + static CURRENT_HOST_BOOT_ID: OnceLock> = OnceLock::new(); + + CURRENT_HOST_BOOT_ID.get_or_init(read_current_host_boot_id).clone() +} + +pub(crate) fn current_process_start_identity() -> Option { + static CURRENT_PROCESS_START_IDENTITY: OnceLock> = OnceLock::new(); + + CURRENT_PROCESS_START_IDENTITY + .get_or_init(|| process_start_identity(process::id())) + .clone() +} + +pub(crate) fn process_start_identity(process_id: u32) -> Option { + read_platform_process_start_identity(process_id) + .and_then(|identity| normalized_process_start_identity(&identity)) +} + fn normalize_accounts( selected: &CodexAccountActivitySummary, accounts: &[CodexAccountActivitySummary], @@ -1413,25 +1444,6 @@ fn accounts_from_marker_record( } } -pub(crate) fn current_host_boot_id() -> Option { - static CURRENT_HOST_BOOT_ID: OnceLock> = OnceLock::new(); - - CURRENT_HOST_BOOT_ID.get_or_init(read_current_host_boot_id).clone() -} - -pub(crate) fn current_process_start_identity() -> Option { - static CURRENT_PROCESS_START_IDENTITY: OnceLock> = OnceLock::new(); - - CURRENT_PROCESS_START_IDENTITY - .get_or_init(|| process_start_identity(process::id())) - .clone() -} - -pub(crate) fn process_start_identity(process_id: u32) -> Option { - read_platform_process_start_identity(process_id) - .and_then(|identity| normalized_process_start_identity(&identity)) -} - fn set_run_activity_marker_process_identity( marker: &mut RunActivityMarkerRecord, process_id: u32, @@ -1478,7 +1490,7 @@ fn read_platform_host_boot_id() -> Option { #[cfg(target_os = "macos")] fn read_platform_host_boot_id() -> Option { - let output = process::Command::new("/usr/sbin/sysctl") + let output = Command::new("/usr/sbin/sysctl") .args(["-n", "kern.boottime"]) .output() .ok()?; @@ -1518,23 +1530,25 @@ fn read_platform_process_start_identity(process_id: u32) -> Option { let Ok(pid) = i32::try_from(process_id) else { return None; }; + if pid <= 0 { return None; } - let mut info = std::mem::MaybeUninit::::zeroed(); - let Ok(info_size) = i32::try_from(std::mem::size_of::()) else { + let mut info = MaybeUninit::::zeroed(); + let Ok(info_size) = i32::try_from(mem::size_of::()) else { return None; }; let read_size = unsafe { libc::proc_pidinfo( pid, - libc::PROC_PIDTBSDINFO, + PROC_PIDTBSDINFO, 0, - info.as_mut_ptr().cast::(), + info.as_mut_ptr().cast::(), info_size, ) }; + if read_size != info_size { return None; } @@ -1849,6 +1863,7 @@ fn write_run_activity_marker_at( let mut marker = run_activity_marker_record_for_attempt(existing_marker.as_ref(), run_id, attempt_number); set_run_activity_marker_process_identity(&mut marker, process_id); + marker.last_activity_unix_epoch = Some(last_activity_unix_epoch); marker.last_protocol_activity_unix_epoch = last_protocol_activity_unix_epoch .or_else(|| same_run_marker.and_then(|marker| marker.last_protocol_activity_unix_epoch)); @@ -2086,27 +2101,47 @@ fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> Str { body.push_str(&format!("protocol_activity={summary_json}\n")); } + + append_run_activity_marker_account_fields(&mut body, marker); + append_run_activity_marker_retry_fields(&mut body, marker); + append_run_activity_marker_review_policy_fields(&mut body, marker); + + body +} + +fn append_run_activity_marker_account_fields( + body: &mut String, + marker: &RunActivityMarkerRecord, +) { if let Some(account) = &marker.account && let Ok(summary_json) = serde_json::to_string(account) - { - body.push_str(&format!("account={summary_json}\n")); - } + { + body.push_str(&format!("account={summary_json}\n")); + } - if !marker.accounts.is_empty() - && let Ok(accounts_json) = serde_json::to_string(&marker.accounts) - { - body.push_str(&format!("accounts={accounts_json}\n")); - } + if !marker.accounts.is_empty() + && let Ok(accounts_json) = serde_json::to_string(&marker.accounts) + { + body.push_str(&format!("accounts={accounts_json}\n")); + } +} - if let Some(retry_budget_attempt_count) = marker.retry_budget_attempt_count { - body.push_str(&format!("retry_budget_attempt_count={retry_budget_attempt_count}\n")); - } +fn append_run_activity_marker_retry_fields(body: &mut String, marker: &RunActivityMarkerRecord) { + if let Some(retry_budget_attempt_count) = marker.retry_budget_attempt_count { + body.push_str(&format!("retry_budget_attempt_count={retry_budget_attempt_count}\n")); + } if let Some(retry_kind) = &marker.retry_kind { body.push_str(&format!("retry_kind={retry_kind}\n")); } if let Some(retry_ready_at_unix_epoch) = marker.retry_ready_at_unix_epoch { body.push_str(&format!("retry_ready_at_unix_epoch={retry_ready_at_unix_epoch}\n")); } +} + +fn append_run_activity_marker_review_policy_fields( + body: &mut String, + marker: &RunActivityMarkerRecord, +) { if let Some(review_policy_phase) = &marker.review_policy_phase { body.push_str(&format!("review_policy_phase={review_policy_phase}\n")); } @@ -2119,8 +2154,6 @@ fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> Str if let Some(review_policy_nonclean_rounds) = marker.review_policy_nonclean_rounds { body.push_str(&format!("review_policy_nonclean_rounds={review_policy_nonclean_rounds}\n")); } - - body } fn parse_marker_list(value: &str) -> Vec { diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 3527155e..474088fd 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -1058,6 +1058,7 @@ fn assert_run_activity_marker_round_trips_clearable_auxiliary_fields() { assert_eq!(marker.run_id(), "run-1"); assert_eq!(marker.attempt_number(), 1); + if let Some(host_boot_id) = state::current_host_boot_id() { assert_eq!(marker.host_boot_id(), Some(host_boot_id.as_str())); assert!( @@ -1076,6 +1077,7 @@ fn assert_run_activity_marker_round_trips_clearable_auxiliary_fields() { "activity markers should record the process start identity for PID-reuse-safe liveness" ); } + assert_eq!(marker.retry_kind(), Some("failure")); assert_eq!(marker.retry_ready_at_unix_epoch(), Some(12_345)); assert_eq!(marker.review_policy_phase(), Some("handoff"));