Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions apps/decodex/src/orchestrator/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,17 @@ where
return Ok(Vec::new());
};
let worktree_mapping = state_store.worktree_for_issue(&issue.id)?;

if let Some(disposition) = superseded_run_disposition(state_store, &run_attempt)? {
return Ok(vec![ActiveRunReconciliation {
issue: issue.clone(),
run_attempt,
worktree_mapping,
disposition,
workflow: workflow.clone(),
}]);
}

let action_workflow = active_reconciliation_workflow_for_lease(
workflow,
Some(ActiveWorkflowOverride { child, workflow: child_context.workflow }),
Expand Down Expand Up @@ -1022,6 +1033,13 @@ where
let Some(run_attempt) = context.state_store.run_attempt(run_attempt.run_id())? else {
return Ok(());
};

if superseded_run_disposition(context.state_store, &run_attempt)?.is_some() {
clear_retry_schedule_and_release(context.retry_queue, context.state_store, child.issue_id)?;

return Ok(());
}

let issue_id = run_attempt.issue_id();
let Some(issue) = refresh_issue(context.tracker, issue_id)? else {
clear_retry_schedule_and_release(context.retry_queue, context.state_store, issue_id)?;
Expand Down
79 changes: 76 additions & 3 deletions apps/decodex/src/orchestrator/reconciliation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ where
&issue,
&run_attempt,
);
let retained_closeout =
terminal_issue_keeps_retained_closeout(
let retained_closeout = terminal_issue_keeps_retained_closeout(
tracker,
&issue,
project,
action_workflow,
state_store,
)?;
let disposition = if !retained_closeout && is_terminal_issue(&issue, action_workflow) {
let disposition =
if let Some(disposition) = superseded_run_disposition(state_store, &run_attempt)? {
Some(disposition)
} else if !retained_closeout && is_terminal_issue(&issue, action_workflow) {
Some(ActiveRunDisposition::Terminal)
} else if !retained_closeout
&& is_issue_nonactive_for_run(&issue, action_workflow)
Expand Down Expand Up @@ -125,6 +127,16 @@ where
};
let worktree_mapping = state_store.worktree_for_issue(issue_id)?;

if let Some(disposition) = superseded_run_disposition(state_store, &run_attempt)? {
return Ok(vec![ActiveRunReconciliation {
issue,
run_attempt,
worktree_mapping,
disposition,
workflow: workflow.clone(),
}]);
}

if run_attempt.status() != "failed" || !is_issue_active_for_run(&issue, workflow) {
return Ok(Vec::new());
}
Expand Down Expand Up @@ -178,6 +190,18 @@ where
ActiveRunDisposition::RetainedReviewComplete => {
reconcile_retained_review_complete_active_run(project, state_store, &action)?;
},
ActiveRunDisposition::Superseded {
newer_run_id,
newer_attempt_number,
} => {
reconcile_superseded_active_run(
project,
state_store,
&action,
newer_run_id,
*newer_attempt_number,
)?;
},
ActiveRunDisposition::Terminal => {
tracing::info!(
project_id = project.service_id(),
Expand Down Expand Up @@ -226,6 +250,36 @@ where
Ok(())
}

fn reconcile_superseded_active_run(
project: &ServiceConfig,
state_store: &StateStore,
action: &ActiveRunReconciliation,
newer_run_id: &str,
newer_attempt_number: i64,
) -> Result<()> {
tracing::info!(
project_id = project.service_id(),
issue_id = action.issue.id,
issue = action.issue.identifier,
run_id = action.run_attempt.run_id(),
attempt = action.run_attempt.attempt_number(),
superseded_by_run_id = newer_run_id,
superseded_by_attempt = newer_attempt_number,
disposition = "superseded",
"Reconciling superseded active run without tracker writeback."
);

mark_run_attempt_if_active(state_store, action.run_attempt.run_id(), "interrupted")?;

if let Some(lease) = state_store.lease_for_issue(&action.issue.id)?
&& lease.run_id() == action.run_attempt.run_id()
{
state_store.clear_lease(&action.issue.id)?;
}

Ok(())
}

fn reconcile_retained_review_complete_active_run(
project: &ServiceConfig,
state_store: &StateStore,
Expand Down Expand Up @@ -423,6 +477,25 @@ fn retained_review_handoff_matches_run(
&& marker.branch_name() == worktree_mapping.branch_name())
}

fn superseded_run_disposition(
state_store: &StateStore,
run_attempt: &RunAttempt,
) -> Result<Option<ActiveRunDisposition>> {
let Some(latest_attempt) = state_store.latest_run_attempt_for_issue(run_attempt.issue_id())?
else {
return Ok(None);
};

if latest_attempt.attempt_number() <= run_attempt.attempt_number() {
return Ok(None);
}

Ok(Some(ActiveRunDisposition::Superseded {
newer_run_id: latest_attempt.run_id().to_owned(),
newer_attempt_number: latest_attempt.attempt_number(),
}))
}

fn stalled_idle_duration(
state_store: &StateStore,
run_attempt: &RunAttempt,
Expand Down
71 changes: 71 additions & 0 deletions apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,77 @@ fn active_run_reconciliation_detects_stalled_run_without_protocol_events() {
}));
}

#[test]
fn active_run_reconciliation_supersedes_stale_lease_for_newer_attempt() {
let (_temp_dir, config, workflow) = temp_project_layout();
let state_store = StateStore::open_in_memory().expect("state store should open");
let issue = sample_issue_with_sort_fields(
"issue-superseded-lease",
"PUB-207",
"In Progress",
&[],
Some(3),
"2026-03-13T04:16:17.133Z",
);
let tracker = FakeTracker::new(vec![issue.clone()]);
let stale_run_id = "run-superseded-lease-1";
let newer_run_id = "run-superseded-lease-2";
let worktree_manager =
WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root());

state_store
.record_run_attempt(stale_run_id, &issue.id, 1, "running")
.expect("stale run should record");
state_store
.record_run_attempt(newer_run_id, &issue.id, 2, "succeeded")
.expect("newer run should record");
state_store
.upsert_lease(config.service_id(), &issue.id, stale_run_id, "In Progress")
.expect("stale lease should record");

let actions = orchestrator::inspect_active_run_reconciliation_at(
&tracker,
&config,
&workflow,
&state_store,
None,
OffsetDateTime::now_utc().unix_timestamp(),
)
.expect("active-run inspection should succeed");

assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0].disposition,
ActiveRunDisposition::Superseded {
newer_run_id: observed_run_id,
newer_attempt_number: 2,
} if observed_run_id == newer_run_id
));

orchestrator::apply_active_run_reconciliation(
&tracker,
&config,
&state_store,
&worktree_manager,
actions,
)
.expect("superseded reconciliation should succeed");

assert!(state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none());
assert_eq!(
state_store
.run_attempt(stale_run_id)
.expect("run attempt lookup should succeed")
.expect("stale run should exist")
.status(),
"interrupted"
);
assert!(
tracker.comments.borrow().is_empty(),
"superseded stale lease must not write needs-attention comments"
);
}

#[test]
fn active_run_reconciliation_keeps_completed_closeout_lane_with_fresh_activity() {
let (_temp_dir, base_config, workflow) = temp_project_layout();
Expand Down
95 changes: 95 additions & 0 deletions apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,101 @@ fn exited_child_reconciliation_detects_stalled_failed_runs_from_protocol_idle()
}));
}

#[test]
fn exited_child_reconciliation_ignores_superseded_failed_run() {
let (_temp_dir, config, workflow) = temp_project_layout();
let state_store = StateStore::open_in_memory().expect("state store should open");
let issue = sample_issue_with_sort_fields(
"issue-superseded-after-exit",
"PUB-206",
"In Progress",
&[],
Some(3),
"2026-03-13T04:16:17.133Z",
);
let tracker = FakeTracker::new(vec![issue.clone()]);
let stale_run_id = "run-superseded-after-exit-1";
let newer_run_id = "run-superseded-after-exit-2";
let worktree_path = config.worktree_root().join(&issue.identifier);
let worktree_manager =
WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root());

fs::create_dir_all(&worktree_path).expect("worktree path should exist");

state_store
.record_run_attempt(stale_run_id, &issue.id, 1, "failed")
.expect("stale run should record");
state_store
.record_run_attempt(newer_run_id, &issue.id, 2, "running")
.expect("newer run should record");
state_store
.upsert_lease(config.service_id(), &issue.id, newer_run_id, "In Progress")
.expect("newer lease should record");
state_store
.upsert_worktree(
config.service_id(),
&issue.id,
"x/pubfi-pub-206",
&worktree_path.display().to_string(),
)
.expect("worktree mapping should record");

state::write_run_protocol_activity_marker(
&worktree_path,
&ProtocolActivityMarker {
run_id: stale_run_id,
attempt_number: 1,
thread_id: None,
turn_id: None,
event_count: 1,
last_event_type: "thread/status/changed",
child_agent_activity: None,
protocol_activity: None,
},
)
.expect("protocol marker should write");

let actions = orchestrator::inspect_exited_daemon_child_reconciliation_at(
&tracker,
&config,
&workflow,
&state_store,
&issue.id,
stale_run_id,
OffsetDateTime::now_utc().unix_timestamp() + ACTIVE_RUN_IDLE_TIMEOUT.as_secs() as i64 + 1,
)
.expect("exited child inspection should succeed");

assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0].disposition,
ActiveRunDisposition::Superseded {
newer_run_id: observed_run_id,
newer_attempt_number: 2,
} if observed_run_id == newer_run_id
));

orchestrator::apply_active_run_reconciliation(
&tracker,
&config,
&state_store,
&worktree_manager,
actions,
)
.expect("superseded reconciliation should succeed");

let lease = state_store
.lease_for_issue(&issue.id)
.expect("lease lookup should succeed")
.expect("newer lease should remain");

assert_eq!(lease.run_id(), newer_run_id);
assert!(
tracker.comments.borrow().is_empty(),
"superseded stale child must not write needs-attention comments"
);
}

#[test]
fn run_project_once_prefers_recovered_in_progress_worktree_after_empty_state_startup() {
let (_temp_dir, config, workflow) = temp_project_layout();
Expand Down
4 changes: 4 additions & 0 deletions apps/decodex/src/orchestrator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,10 @@ pub(crate) enum RetryDispatchDecision {
#[derive(Clone, Debug)]
pub(crate) enum ActiveRunDisposition {
RetainedReviewComplete,
Superseded {
newer_run_id: String,
newer_attempt_number: i64,
},
Terminal,
NonActive,
Stalled { idle_for: Duration },
Expand Down