Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 95 additions & 8 deletions apps/decodex/src/orchestrator/status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
#[cfg(target_os = "macos")]
use std::mem;
#[cfg(target_os = "macos")]
use std::mem::MaybeUninit;

use records::LinearExecutionEventRecord;
#[cfg(target_os = "macos")]
use libc::PROC_PIDTBSDINFO;
#[cfg(target_os = "macos")]
use libc::SZOMB;
#[cfg(target_os = "macos")]
use libc::c_void;
#[cfg(target_os = "macos")]
use libc::proc_bsdinfo;

use crate::pull_request::{self, PullRequestLandingGateView};
use crate::worktree;
Expand Down Expand Up @@ -1811,6 +1824,12 @@ where
{
let tracker_policy = workflow.frontmatter().tracker();

if tracker_policy.terminal_states().iter().any(|state| state == &issue.state.name) {
return Ok(("closed", "terminal_state"));
}
if issue.has_label(tracker_policy.needs_attention_label()) {
return Ok(("blocked", "issue_needs_attention"));
}
if state_store.issue_has_active_shared_claim(project.service_id(), &issue.id)? {
return Ok(("claimed", "shared_claim_present"));
}
Expand All @@ -1821,18 +1840,12 @@ where
)? {
return Ok(("blocked", QUEUE_REASON_LINEAR_ACTIVE_LABEL_PRESENT));
}
if tracker_policy.terminal_states().iter().any(|state| state == &issue.state.name) {
return Ok(("closed", "terminal_state"));
}
if !tracker_policy.startable_states().iter().any(|state| state == &issue.state.name) {
return Ok(("blocked", "non_startable_state"));
}
if issue.has_label(tracker_policy.opt_out_label()) {
return Ok(("blocked", "issue_opted_out"));
}
if issue.has_label(tracker_policy.needs_attention_label()) {
return Ok(("blocked", "issue_needs_attention"));
}
if !todo_blocker_rule_passes(issue, workflow) {
return Ok(("blocked", "open_tracker_blockers"));
}
Expand Down Expand Up @@ -3942,12 +3955,86 @@ fn process_is_alive(process_id: u32) -> bool {
// Use the kernel liveness probe directly so recovery does not depend on a shell
// builtin or `kill` binary being present on PATH.
match unsafe { libc::kill(process_id, 0) } {
0 => true,
-1 => matches!(std::io::Error::last_os_error().raw_os_error(), Some(libc::EPERM)),
0 => !process_is_zombie_or_uninspectable_after_signalable_probe(process_id),
-1 => {
matches!(std::io::Error::last_os_error().raw_os_error(), Some(libc::EPERM))
&& !process_is_zombie(process_id)
},
_ => false,
}
}

fn process_is_zombie_or_uninspectable_after_signalable_probe(process_id: pid_t) -> bool {
process_is_zombie_or_uninspectable(process_id)
}

#[cfg(not(target_os = "macos"))]
fn process_is_zombie_or_uninspectable(process_id: pid_t) -> bool {
process_is_zombie(process_id)
}

#[cfg(target_os = "linux")]
fn process_is_zombie(process_id: pid_t) -> bool {
let Ok(stat) = fs::read_to_string(format!("/proc/{process_id}/stat")) else {
return false;
};
let Some(comm_end) = stat.rfind(')') else {
return false;
};
let Some(after_comm) = stat.get(comm_end + 2..) else {
return false;
};

after_comm.split_whitespace().next() == Some("Z")
}

#[cfg(target_os = "macos")]
fn process_is_zombie_or_uninspectable(process_id: pid_t) -> bool {
match macos_process_bsd_status(process_id) {
Some(status) => status == SZOMB,
None => true,
}
}

#[cfg(target_os = "macos")]
fn process_is_zombie(process_id: pid_t) -> bool {
macos_process_bsd_status(process_id) == Some(SZOMB)
}

#[cfg(target_os = "macos")]
fn macos_process_bsd_status(process_id: pid_t) -> Option<u32> {
if process_id <= 0 {
return None;
}

let mut info = MaybeUninit::<proc_bsdinfo>::zeroed();
let Ok(info_size) = i32::try_from(mem::size_of::<proc_bsdinfo>()) else {
return None;
};
let read_size = unsafe {
libc::proc_pidinfo(
process_id,
PROC_PIDTBSDINFO,
0,
info.as_mut_ptr().cast::<c_void>(),
info_size,
)
};

if read_size != info_size {
return None;
}

let info = unsafe { info.assume_init() };

Some(info.pbi_status)
}

#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn process_is_zombie(_process_id: pid_t) -> bool {
false
}

fn hydrate_status_snapshot_state(
_project: &ServiceConfig,
_state_store: &StateStore,
Expand Down
50 changes: 50 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,56 @@ fn live_operator_status_snapshot_excludes_claimed_candidates_from_waiting_intake
assert!(rendered.contains("Active queue echoes: 1"));
}

#[test]
fn live_operator_status_snapshot_prioritizes_needs_attention_over_shared_claim() {
let (_temp_dir, config, workflow) = temp_project_layout();
let state_store = StateStore::open_in_memory().expect("state store should open");
let issue = sample_issue_with_sort_fields(
"issue-attention-claimed",
"PUB-113",
"Todo",
&["decodex:needs-attention"],
Some(3),
"2026-03-13T06:16:17.133Z",
);
let tracker = FakeTracker::new(vec![issue.clone()]);

state_store
.record_run_attempt("run-attention-claimed", &issue.id, 1, "running")
.expect("active run should record");
state_store
.upsert_lease(config.service_id(), &issue.id, "run-attention-claimed", "In Progress")
.expect("active lease should record");

let snapshot = orchestrator::build_live_operator_status_snapshot(
&tracker,
&config,
&workflow,
&state_store,
10,
)
.expect("snapshot should build");
let project = snapshot.projects.first().expect("project summary should exist");
let candidate = snapshot
.queued_candidates
.iter()
.find(|candidate| candidate.issue_identifier == "PUB-113")
.expect("needs-attention claimed issue should remain visible");
let attention = candidate.attention.as_ref().expect("attention details should render");

assert_eq!(candidate.classification, "blocked");
assert_eq!(candidate.reason, "issue_needs_attention");
assert_eq!(
attention.auto_retry_blocked_reason.as_deref(),
Some("needs_attention_label")
);
assert_eq!(project.attention_count, 1);
assert_eq!(
project.queued_candidate_count, 1,
"needs-attention queue echoes remain in blocked intake while also counting as attention"
);
}

#[test]
fn live_operator_status_snapshot_blocks_active_plus_queued_label_without_local_claim() {
let (_temp_dir, config, workflow) = temp_project_layout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,72 @@ fn operator_status_snapshot_counts_stopped_active_process_as_attention_not_runni
assert_eq!(project.attention_count, 1);
}

#[cfg(unix)]
#[test]
fn operator_status_snapshot_counts_zombie_active_process_as_attention_not_running() {
let (_temp_dir, config, _workflow) = temp_project_layout();
let state_store = StateStore::open_in_memory().expect("state store should open");
let issue = sample_issue("Todo", &[]);
let worktree_path = config.worktree_root().join("PUB-101");
let mut child = Command::new("/bin/sh")
.arg("-c")
.arg("exit 0")
.spawn()
.expect("short-lived child process should spawn");
let child_pid = child.id();

state_store
.record_run_attempt("run-1", &issue.id, 1, "running")
.expect("run attempt should record");
state_store
.upsert_lease("pubfi", &issue.id, "run-1", "In Progress")
.expect("lease should record");
state_store
.upsert_worktree(
"pubfi",
&issue.id,
"x/pubfi-pub-101",
&worktree_path.display().to_string(),
)
.expect("worktree should record");

fs::create_dir_all(&worktree_path).expect("worktree path should exist");
state::write_run_activity_marker_for_process(&worktree_path, "run-1", 1, child_pid)
.expect("zombie process marker should write");

let deadline = Instant::now() + Duration::from_secs(5);
let mut observed_stopped = false;

while Instant::now() < deadline {
if !orchestrator::process_is_alive(child_pid) {
observed_stopped = true;

break;
}

thread::sleep(Duration::from_millis(10));
}

let snapshot = observed_stopped.then(|| {
orchestrator::build_operator_status_snapshot(&config, &state_store, 10)
.expect("snapshot should build")
});

child.wait().expect("short-lived child process should reap");

assert!(observed_stopped, "exited child process must not count as alive");

let snapshot = snapshot.expect("snapshot should be captured while child is unreaped");
let run = snapshot.active_runs.first().expect("active run should remain visible");
let project = snapshot.projects.first().expect("project summary should exist");

assert_eq!(run.phase, "executing");
assert_eq!(run.process_alive, Some(false));
assert_eq!(run.process_liveness_reason.as_deref(), Some("process_stopped"));
assert_eq!(project.active_run_count, 0);
assert_eq!(project.attention_count, 1);
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn operator_status_snapshot_counts_previous_boot_process_as_attention_not_running() {
Expand Down
9 changes: 5 additions & 4 deletions docs/reference/operator-control-plane.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ Worktree visibility follows the owning dashboard section:
child process/thread/protocol relationship for the path. Process liveness requires
an alive PID plus matching `.decodex-run-activity` `host_boot_id` and
`process_start_identity`; a previous-boot marker, same-boot PID reuse, missing
identity, or unavailable current host/process identity is recovery input, not proof
of active execution. `execution_liveness = process_identity_mismatch` is the stable
summary for previous-boot or PID-reuse evidence, while `process_liveness_reason`
explains the exact failed identity check when `process_alive` is false.
identity, an unreaped zombie PID, or unavailable current host/process identity is
recovery input, not proof of active execution. `execution_liveness =
process_identity_mismatch` is the stable summary for previous-boot or PID-reuse
evidence, while `process_liveness_reason` explains the exact failed identity check
when `process_alive` is false.
`active_lease` is queue lease ownership only; `execution_liveness` explains why
the lane is still visible when the queue lease is not held.
- Running lanes derive CLI and dashboard text from the same `OperatorRunStatus`
Expand Down
2 changes: 1 addition & 1 deletion docs/spec/runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ The runtime database stores at least:
- tracker and PR cache rows needed to survive connector outages
- typed connector health and external API backoff

For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees/<ISSUE>/.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md).
For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees/<ISSUE>/.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, a marker whose PID has exited into an unreaped zombie state, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md).
Post-review ownership is stored in the runtime database. Retained handoff rows record the authoritative PR URL, branch lineage, validated PR head OID, run id, and attempt number that completed the `In Review` handoff. Retained orchestration rows record the current post-review phase for that exact handoff identity. If the matching database row is missing, post-review ownership must block as unresolved instead of rebinding from branch-name, current-head, Linear comments, or other heuristics. If a retained review marker exists but a stored handoff or orchestration head no longer matches a clean retained worktree and matching PR head, operator status must keep the marker PR URL visible when known and recovery diagnosis must report the concrete mismatched field before any explicit rebind refresh.
The only source-tree marker that clean-source checks may ignore is the untracked `.decodex-run-activity` heartbeat marker. Review handoff, orchestration, retry, phase timing, and retained PR state belong in the Decodex runtime database, not in root-level or worktree-local review marker files.

Expand Down