From c9a03c359bc4240a1ebd627f822ec562f6a9b6ab Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Fri, 29 May 2026 10:29:59 +0800 Subject: [PATCH 1/2] {"schema":"decodex/commit/1","summary":"Fix active run liveness readback","authority":"XY-633"} --- apps/decodex/src/orchestrator/status.rs | 97 +++++++++++++++++-- .../tests/operator/status/queue.rs | 50 ++++++++++ .../tests/operator/status/running_lanes.rs | 66 +++++++++++++ docs/reference/operator-control-plane.md | 9 +- docs/spec/runtime.md | 2 +- 5 files changed, 211 insertions(+), 13 deletions(-) diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 2b8a859b..624bbfe4 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -1,4 +1,11 @@ +use std::mem; +use std::mem::MaybeUninit; + use records::LinearExecutionEventRecord; +use libc::PROC_PIDTBSDINFO; +use libc::SZOMB; +use libc::c_void; +use libc::proc_bsdinfo; use crate::pull_request::{self, PullRequestLandingGateView}; use crate::worktree; @@ -1811,6 +1818,12 @@ where { let tracker_policy = workflow.frontmatter().tracker(); + if tracker_policy.terminal_states().iter().any(|state| state == &issue.state.name) { + return Ok(("closed", "terminal_state")); + } + if issue.has_label(tracker_policy.needs_attention_label()) { + return Ok(("blocked", "issue_needs_attention")); + } if state_store.issue_has_active_shared_claim(project.service_id(), &issue.id)? { return Ok(("claimed", "shared_claim_present")); } @@ -1821,18 +1834,12 @@ where )? { return Ok(("blocked", QUEUE_REASON_LINEAR_ACTIVE_LABEL_PRESENT)); } - if tracker_policy.terminal_states().iter().any(|state| state == &issue.state.name) { - return Ok(("closed", "terminal_state")); - } if !tracker_policy.startable_states().iter().any(|state| state == &issue.state.name) { return Ok(("blocked", "non_startable_state")); } if issue.has_label(tracker_policy.opt_out_label()) { return Ok(("blocked", "issue_opted_out")); } - if issue.has_label(tracker_policy.needs_attention_label()) { - return Ok(("blocked", "issue_needs_attention")); - } if !todo_blocker_rule_passes(issue, workflow) { return Ok(("blocked", "open_tracker_blockers")); } @@ -3942,12 +3949,86 @@ fn process_is_alive(process_id: u32) -> bool { // Use the kernel liveness probe directly so recovery does not depend on a shell // builtin or `kill` binary being present on PATH. match unsafe { libc::kill(process_id, 0) } { - 0 => true, - -1 => matches!(std::io::Error::last_os_error().raw_os_error(), Some(libc::EPERM)), + 0 => !process_is_zombie_or_uninspectable_after_signalable_probe(process_id), + -1 => { + matches!(std::io::Error::last_os_error().raw_os_error(), Some(libc::EPERM)) + && !process_is_zombie(process_id) + }, _ => false, } } +fn process_is_zombie_or_uninspectable_after_signalable_probe(process_id: pid_t) -> bool { + process_is_zombie_or_uninspectable(process_id) +} + +#[cfg(not(target_os = "macos"))] +fn process_is_zombie_or_uninspectable(process_id: pid_t) -> bool { + process_is_zombie(process_id) +} + +#[cfg(target_os = "linux")] +fn process_is_zombie(process_id: pid_t) -> bool { + let Ok(stat) = fs::read_to_string(format!("/proc/{process_id}/stat")) else { + return false; + }; + let Some(comm_end) = stat.rfind(')') else { + return false; + }; + let Some(after_comm) = stat.get(comm_end + 2..) else { + return false; + }; + + after_comm.split_whitespace().next() == Some("Z") +} + +#[cfg(target_os = "macos")] +fn process_is_zombie_or_uninspectable(process_id: pid_t) -> bool { + match macos_process_bsd_status(process_id) { + Some(status) => status == SZOMB, + None => true, + } +} + +#[cfg(target_os = "macos")] +fn process_is_zombie(process_id: pid_t) -> bool { + macos_process_bsd_status(process_id) == Some(SZOMB) +} + +#[cfg(target_os = "macos")] +fn macos_process_bsd_status(process_id: pid_t) -> Option { + if process_id <= 0 { + return None; + } + + let mut info = MaybeUninit::::zeroed(); + let Ok(info_size) = i32::try_from(mem::size_of::()) else { + return None; + }; + let read_size = unsafe { + libc::proc_pidinfo( + process_id, + PROC_PIDTBSDINFO, + 0, + info.as_mut_ptr().cast::(), + info_size, + ) + }; + + if read_size != info_size { + return None; + } + + let info = unsafe { info.assume_init() }; + + Some(info.pbi_status) +} + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +fn process_is_zombie(_process_id: pid_t) -> bool { + false +} + fn hydrate_status_snapshot_state( _project: &ServiceConfig, _state_store: &StateStore, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/queue.rs b/apps/decodex/src/orchestrator/tests/operator/status/queue.rs index f3190979..8b21036d 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/queue.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/queue.rs @@ -253,6 +253,56 @@ fn live_operator_status_snapshot_excludes_claimed_candidates_from_waiting_intake assert!(rendered.contains("Active queue echoes: 1")); } +#[test] +fn live_operator_status_snapshot_prioritizes_needs_attention_over_shared_claim() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue_with_sort_fields( + "issue-attention-claimed", + "PUB-113", + "Todo", + &["decodex:needs-attention"], + Some(3), + "2026-03-13T06:16:17.133Z", + ); + let tracker = FakeTracker::new(vec![issue.clone()]); + + state_store + .record_run_attempt("run-attention-claimed", &issue.id, 1, "running") + .expect("active run should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "run-attention-claimed", "In Progress") + .expect("active lease should record"); + + let snapshot = orchestrator::build_live_operator_status_snapshot( + &tracker, + &config, + &workflow, + &state_store, + 10, + ) + .expect("snapshot should build"); + let project = snapshot.projects.first().expect("project summary should exist"); + let candidate = snapshot + .queued_candidates + .iter() + .find(|candidate| candidate.issue_identifier == "PUB-113") + .expect("needs-attention claimed issue should remain visible"); + let attention = candidate.attention.as_ref().expect("attention details should render"); + + assert_eq!(candidate.classification, "blocked"); + assert_eq!(candidate.reason, "issue_needs_attention"); + assert_eq!( + attention.auto_retry_blocked_reason.as_deref(), + Some("needs_attention_label") + ); + assert_eq!(project.attention_count, 1); + assert_eq!( + project.queued_candidate_count, 1, + "needs-attention queue echoes remain in blocked intake while also counting as attention" + ); +} + #[test] fn live_operator_status_snapshot_blocks_active_plus_queued_label_without_local_claim() { let (_temp_dir, config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index 492f8069..6e618694 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -814,6 +814,72 @@ fn operator_status_snapshot_counts_stopped_active_process_as_attention_not_runni assert_eq!(project.attention_count, 1); } +#[cfg(unix)] +#[test] +fn operator_status_snapshot_counts_zombie_active_process_as_attention_not_running() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("Todo", &[]); + let worktree_path = config.worktree_root().join("PUB-101"); + let mut child = Command::new("/bin/sh") + .arg("-c") + .arg("exit 0") + .spawn() + .expect("short-lived child process should spawn"); + let child_pid = child.id(); + + state_store + .record_run_attempt("run-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease("pubfi", &issue.id, "run-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, child_pid) + .expect("zombie process marker should write"); + + let deadline = Instant::now() + Duration::from_secs(5); + let mut observed_stopped = false; + + while Instant::now() < deadline { + if !orchestrator::process_is_alive(child_pid) { + observed_stopped = true; + + break; + } + + thread::sleep(Duration::from_millis(10)); + } + + let snapshot = observed_stopped.then(|| { + orchestrator::build_operator_status_snapshot(&config, &state_store, 10) + .expect("snapshot should build") + }); + + child.wait().expect("short-lived child process should reap"); + + assert!(observed_stopped, "exited child process must not count as alive"); + + let snapshot = snapshot.expect("snapshot should be captured while child is unreaped"); + let run = snapshot.active_runs.first().expect("active run should remain visible"); + let project = snapshot.projects.first().expect("project summary should exist"); + + assert_eq!(run.phase, "executing"); + assert_eq!(run.process_alive, Some(false)); + assert_eq!(run.process_liveness_reason.as_deref(), Some("process_stopped")); + assert_eq!(project.active_run_count, 0); + assert_eq!(project.attention_count, 1); +} + #[cfg(any(target_os = "linux", target_os = "macos"))] #[test] fn operator_status_snapshot_counts_previous_boot_process_as_attention_not_running() { diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index a3bf2e1f..a525b9b1 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -202,10 +202,11 @@ Worktree visibility follows the owning dashboard section: child process/thread/protocol relationship for the path. Process liveness requires an alive PID plus matching `.decodex-run-activity` `host_boot_id` and `process_start_identity`; a previous-boot marker, same-boot PID reuse, missing - identity, or unavailable current host/process identity is recovery input, not proof - of active execution. `execution_liveness = process_identity_mismatch` is the stable - summary for previous-boot or PID-reuse evidence, while `process_liveness_reason` - explains the exact failed identity check when `process_alive` is false. + identity, an unreaped zombie PID, or unavailable current host/process identity is + recovery input, not proof of active execution. `execution_liveness = + process_identity_mismatch` is the stable summary for previous-boot or PID-reuse + evidence, while `process_liveness_reason` explains the exact failed identity check + when `process_alive` is false. `active_lease` is queue lease ownership only; `execution_liveness` explains why the lane is still visible when the queue lease is not held. - Running lanes derive CLI and dashboard text from the same `OperatorRunStatus` diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 7a022339..bb31ab38 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -384,7 +384,7 @@ The runtime database stores at least: - tracker and PR cache rows needed to survive connector outages - typed connector health and external API backoff -For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). +For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, a marker whose PID has exited into an unreaped zombie state, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). Post-review ownership is stored in the runtime database. Retained handoff rows record the authoritative PR URL, branch lineage, validated PR head OID, run id, and attempt number that completed the `In Review` handoff. Retained orchestration rows record the current post-review phase for that exact handoff identity. If the matching database row is missing, post-review ownership must block as unresolved instead of rebinding from branch-name, current-head, Linear comments, or other heuristics. If a retained review marker exists but a stored handoff or orchestration head no longer matches a clean retained worktree and matching PR head, operator status must keep the marker PR URL visible when known and recovery diagnosis must report the concrete mismatched field before any explicit rebind refresh. The only source-tree marker that clean-source checks may ignore is the untracked `.decodex-run-activity` heartbeat marker. Review handoff, orchestration, retry, phase timing, and retained PR state belong in the Decodex runtime database, not in root-level or worktree-local review marker files. From d358da99e3150da0a85e8d2b1c7eb403f82adac3 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Fri, 29 May 2026 10:36:55 +0800 Subject: [PATCH 2/2] {"schema":"decodex/commit/1","summary":"Gate macOS process liveness imports","authority":"XY-633"} --- apps/decodex/src/orchestrator/status.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 624bbfe4..d9e8119c 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -1,10 +1,16 @@ +#[cfg(target_os = "macos")] use std::mem; +#[cfg(target_os = "macos")] use std::mem::MaybeUninit; use records::LinearExecutionEventRecord; +#[cfg(target_os = "macos")] use libc::PROC_PIDTBSDINFO; +#[cfg(target_os = "macos")] use libc::SZOMB; +#[cfg(target_os = "macos")] use libc::c_void; +#[cfg(target_os = "macos")] use libc::proc_bsdinfo; use crate::pull_request::{self, PullRequestLandingGateView};