diff --git a/apps/decodex/src/orchestrator/daemon.rs b/apps/decodex/src/orchestrator/daemon.rs index e50a30b2..10059692 100644 --- a/apps/decodex/src/orchestrator/daemon.rs +++ b/apps/decodex/src/orchestrator/daemon.rs @@ -367,6 +367,17 @@ where return Ok(Vec::new()); }; let worktree_mapping = state_store.worktree_for_issue(&issue.id)?; + + if let Some(disposition) = superseded_run_disposition(state_store, &run_attempt)? { + return Ok(vec![ActiveRunReconciliation { + issue: issue.clone(), + run_attempt, + worktree_mapping, + disposition, + workflow: workflow.clone(), + }]); + } + let action_workflow = active_reconciliation_workflow_for_lease( workflow, Some(ActiveWorkflowOverride { child, workflow: child_context.workflow }), @@ -1022,6 +1033,13 @@ where let Some(run_attempt) = context.state_store.run_attempt(run_attempt.run_id())? else { return Ok(()); }; + + if superseded_run_disposition(context.state_store, &run_attempt)?.is_some() { + clear_retry_schedule_and_release(context.retry_queue, context.state_store, child.issue_id)?; + + return Ok(()); + } + let issue_id = run_attempt.issue_id(); let Some(issue) = refresh_issue(context.tracker, issue_id)? else { clear_retry_schedule_and_release(context.retry_queue, context.state_store, issue_id)?; diff --git a/apps/decodex/src/orchestrator/reconciliation.rs b/apps/decodex/src/orchestrator/reconciliation.rs index af29f592..7fece890 100644 --- a/apps/decodex/src/orchestrator/reconciliation.rs +++ b/apps/decodex/src/orchestrator/reconciliation.rs @@ -36,15 +36,17 @@ where &issue, &run_attempt, ); - let retained_closeout = - terminal_issue_keeps_retained_closeout( + let retained_closeout = terminal_issue_keeps_retained_closeout( tracker, &issue, project, action_workflow, state_store, )?; - let disposition = if !retained_closeout && is_terminal_issue(&issue, action_workflow) { + let disposition = + if let Some(disposition) = superseded_run_disposition(state_store, &run_attempt)? { + Some(disposition) + } else if !retained_closeout && is_terminal_issue(&issue, action_workflow) { Some(ActiveRunDisposition::Terminal) } else if !retained_closeout && is_issue_nonactive_for_run(&issue, action_workflow) @@ -125,6 +127,16 @@ where }; let worktree_mapping = state_store.worktree_for_issue(issue_id)?; + if let Some(disposition) = superseded_run_disposition(state_store, &run_attempt)? { + return Ok(vec![ActiveRunReconciliation { + issue, + run_attempt, + worktree_mapping, + disposition, + workflow: workflow.clone(), + }]); + } + if run_attempt.status() != "failed" || !is_issue_active_for_run(&issue, workflow) { return Ok(Vec::new()); } @@ -178,6 +190,18 @@ where ActiveRunDisposition::RetainedReviewComplete => { reconcile_retained_review_complete_active_run(project, state_store, &action)?; }, + ActiveRunDisposition::Superseded { + newer_run_id, + newer_attempt_number, + } => { + reconcile_superseded_active_run( + project, + state_store, + &action, + newer_run_id, + *newer_attempt_number, + )?; + }, ActiveRunDisposition::Terminal => { tracing::info!( project_id = project.service_id(), @@ -226,6 +250,36 @@ where Ok(()) } +fn reconcile_superseded_active_run( + project: &ServiceConfig, + state_store: &StateStore, + action: &ActiveRunReconciliation, + newer_run_id: &str, + newer_attempt_number: i64, +) -> Result<()> { + tracing::info!( + project_id = project.service_id(), + issue_id = action.issue.id, + issue = action.issue.identifier, + run_id = action.run_attempt.run_id(), + attempt = action.run_attempt.attempt_number(), + superseded_by_run_id = newer_run_id, + superseded_by_attempt = newer_attempt_number, + disposition = "superseded", + "Reconciling superseded active run without tracker writeback." + ); + + mark_run_attempt_if_active(state_store, action.run_attempt.run_id(), "interrupted")?; + + if let Some(lease) = state_store.lease_for_issue(&action.issue.id)? + && lease.run_id() == action.run_attempt.run_id() + { + state_store.clear_lease(&action.issue.id)?; + } + + Ok(()) +} + fn reconcile_retained_review_complete_active_run( project: &ServiceConfig, state_store: &StateStore, @@ -423,6 +477,25 @@ fn retained_review_handoff_matches_run( && marker.branch_name() == worktree_mapping.branch_name()) } +fn superseded_run_disposition( + state_store: &StateStore, + run_attempt: &RunAttempt, +) -> Result> { + let Some(latest_attempt) = state_store.latest_run_attempt_for_issue(run_attempt.issue_id())? + else { + return Ok(None); + }; + + if latest_attempt.attempt_number() <= run_attempt.attempt_number() { + return Ok(None); + } + + Ok(Some(ActiveRunDisposition::Superseded { + newer_run_id: latest_attempt.run_id().to_owned(), + newer_attempt_number: latest_attempt.attempt_number(), + })) +} + fn stalled_idle_duration( state_store: &StateStore, run_attempt: &RunAttempt, diff --git a/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs b/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs index ff1a1560..e6051c05 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs @@ -139,6 +139,77 @@ fn active_run_reconciliation_detects_stalled_run_without_protocol_events() { })); } +#[test] +fn active_run_reconciliation_supersedes_stale_lease_for_newer_attempt() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue_with_sort_fields( + "issue-superseded-lease", + "PUB-207", + "In Progress", + &[], + Some(3), + "2026-03-13T04:16:17.133Z", + ); + let tracker = FakeTracker::new(vec![issue.clone()]); + let stale_run_id = "run-superseded-lease-1"; + let newer_run_id = "run-superseded-lease-2"; + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + + state_store + .record_run_attempt(stale_run_id, &issue.id, 1, "running") + .expect("stale run should record"); + state_store + .record_run_attempt(newer_run_id, &issue.id, 2, "succeeded") + .expect("newer run should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, stale_run_id, "In Progress") + .expect("stale lease should record"); + + let actions = orchestrator::inspect_active_run_reconciliation_at( + &tracker, + &config, + &workflow, + &state_store, + None, + OffsetDateTime::now_utc().unix_timestamp(), + ) + .expect("active-run inspection should succeed"); + + assert_eq!(actions.len(), 1); + assert!(matches!( + &actions[0].disposition, + ActiveRunDisposition::Superseded { + newer_run_id: observed_run_id, + newer_attempt_number: 2, + } if observed_run_id == newer_run_id + )); + + orchestrator::apply_active_run_reconciliation( + &tracker, + &config, + &state_store, + &worktree_manager, + actions, + ) + .expect("superseded reconciliation should succeed"); + + assert!(state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none()); + assert_eq!( + state_store + .run_attempt(stale_run_id) + .expect("run attempt lookup should succeed") + .expect("stale run should exist") + .status(), + "interrupted" + ); + assert!( + tracker.comments.borrow().is_empty(), + "superseded stale lease must not write needs-attention comments" + ); +} + #[test] fn active_run_reconciliation_keeps_completed_closeout_lane_with_fresh_activity() { let (_temp_dir, base_config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs index e8b721e1..d2669cc0 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs @@ -71,6 +71,101 @@ fn exited_child_reconciliation_detects_stalled_failed_runs_from_protocol_idle() })); } +#[test] +fn exited_child_reconciliation_ignores_superseded_failed_run() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue_with_sort_fields( + "issue-superseded-after-exit", + "PUB-206", + "In Progress", + &[], + Some(3), + "2026-03-13T04:16:17.133Z", + ); + let tracker = FakeTracker::new(vec![issue.clone()]); + let stale_run_id = "run-superseded-after-exit-1"; + let newer_run_id = "run-superseded-after-exit-2"; + let worktree_path = config.worktree_root().join(&issue.identifier); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + + state_store + .record_run_attempt(stale_run_id, &issue.id, 1, "failed") + .expect("stale run should record"); + state_store + .record_run_attempt(newer_run_id, &issue.id, 2, "running") + .expect("newer run should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, newer_run_id, "In Progress") + .expect("newer lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-206", + &worktree_path.display().to_string(), + ) + .expect("worktree mapping should record"); + + state::write_run_protocol_activity_marker( + &worktree_path, + &ProtocolActivityMarker { + run_id: stale_run_id, + attempt_number: 1, + thread_id: None, + turn_id: None, + event_count: 1, + last_event_type: "thread/status/changed", + child_agent_activity: None, + protocol_activity: None, + }, + ) + .expect("protocol marker should write"); + + let actions = orchestrator::inspect_exited_daemon_child_reconciliation_at( + &tracker, + &config, + &workflow, + &state_store, + &issue.id, + stale_run_id, + OffsetDateTime::now_utc().unix_timestamp() + ACTIVE_RUN_IDLE_TIMEOUT.as_secs() as i64 + 1, + ) + .expect("exited child inspection should succeed"); + + assert_eq!(actions.len(), 1); + assert!(matches!( + &actions[0].disposition, + ActiveRunDisposition::Superseded { + newer_run_id: observed_run_id, + newer_attempt_number: 2, + } if observed_run_id == newer_run_id + )); + + orchestrator::apply_active_run_reconciliation( + &tracker, + &config, + &state_store, + &worktree_manager, + actions, + ) + .expect("superseded reconciliation should succeed"); + + let lease = state_store + .lease_for_issue(&issue.id) + .expect("lease lookup should succeed") + .expect("newer lease should remain"); + + assert_eq!(lease.run_id(), newer_run_id); + assert!( + tracker.comments.borrow().is_empty(), + "superseded stale child must not write needs-attention comments" + ); +} + #[test] fn run_project_once_prefers_recovered_in_progress_worktree_after_empty_state_startup() { let (_temp_dir, config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index de193cfa..2361bf76 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -1378,6 +1378,10 @@ pub(crate) enum RetryDispatchDecision { #[derive(Clone, Debug)] pub(crate) enum ActiveRunDisposition { RetainedReviewComplete, + Superseded { + newer_run_id: String, + newer_attempt_number: i64, + }, Terminal, NonActive, Stalled { idle_for: Duration },