Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/decodex/src/orchestrator/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ where
worktree_mapping.as_ref(),
)? {
Some(ActiveRunDisposition::RetainedReviewComplete)
} else if stalled_run_has_retained_partial_progress(worktree_mapping.as_ref()) {
Some(ActiveRunDisposition::StalledRetainedPartialProgress { idle_for })
} else {
Some(ActiveRunDisposition::Stalled { idle_for })
}
Expand Down
31 changes: 24 additions & 7 deletions apps/decodex/src/orchestrator/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ fn terminal_failure_lifecycle_event(
issue_run: &IssueRunPlan,
failure: TerminalFailureLifecycle<'_>,
) -> records::LinearExecutionEventRecord {
let event_type = if failure.manual_attention_requested {
let retained_partial_progress = failure.error_class == "partial_progress_retained";
let event_type = if failure.manual_attention_requested || retained_partial_progress {
"needs_attention"
} else {
"terminal_failure"
Expand All @@ -447,12 +448,26 @@ fn terminal_failure_lifecycle_event(
record.worktree_path = Some(failure.worktree_path.to_owned());
record.error_class = Some(failure.error_class.to_owned());
record.next_action = Some(failure.next_action.to_owned());
record.blockers = Some(vec![format!("Run failed with `{}`.", failure.error_class)]);
record.evidence = Some(vec![format!(
"Attempt {} reached terminal failure handling.",
issue_run.attempt_number
)]);
record.summary = Some(String::from("Decodex run failed and needs attention."));

if retained_partial_progress {
record.blockers = Some(vec![String::from(
"Retained tracked worktree changes require operator recovery.",
)]);
record.evidence = Some(vec![format!(
"Attempt {} stopped with tracked worktree changes retained.",
issue_run.attempt_number
)]);
record.summary = Some(String::from("Decodex retained partial progress and needs attention."));
record.terminal_path = Some(String::from("retained_partial_progress"));
} else {
record.blockers = Some(vec![format!("Run failed with `{}`.", failure.error_class)]);
record.evidence = Some(vec![format!(
"Attempt {} reached terminal failure handling.",
issue_run.attempt_number
)]);
record.summary = Some(String::from("Decodex run failed and needs attention."));
}

record.pr_url = failure.pr_url.map(ToOwned::to_owned);
record.target_state = Some(failure.target_state.to_owned());

Expand Down Expand Up @@ -1448,6 +1463,7 @@ fn run_failure_requires_terminal_attention(error: &Report) -> bool {
error.downcast_ref::<ManualAttentionRequested>().is_some()
|| error.downcast_ref::<AppServerZeroEvidenceStartFailure>().is_some()
|| error.downcast_ref::<ReviewHandoffNeedsAttention>().is_some()
|| error.downcast_ref::<RetainedPartialProgress>().is_some()
|| error.downcast_ref::<StalledRunNeedsAttention>().is_some()
|| error.downcast_ref::<AppServerCapabilityPreflightFailure>().is_some()
|| error.downcast_ref::<AppServerHomePreflightFailure>().is_some()
Expand Down Expand Up @@ -1626,6 +1642,7 @@ fn retained_partial_progress_error(
fn terminal_failure_has_specific_error_class(error: &Report) -> bool {
error.downcast_ref::<ManualAttentionRequested>().is_some()
|| error.downcast_ref::<ReviewHandoffNeedsAttention>().is_some()
|| error.downcast_ref::<RetainedPartialProgress>().is_some()
|| error.downcast_ref::<AgentGitCredentialsUnavailable>().is_some()
|| error.downcast_ref::<AppServerCapabilityPreflightFailure>().is_some()
|| error.downcast_ref::<AppServerHomePreflightFailure>().is_some()
Expand Down
132 changes: 108 additions & 24 deletions apps/decodex/src/orchestrator/reconciliation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ where
worktree_mapping.as_ref(),
)? {
Some(ActiveRunDisposition::RetainedReviewComplete)
} else if stalled_run_has_retained_partial_progress(worktree_mapping.as_ref()) {
Some(ActiveRunDisposition::StalledRetainedPartialProgress { idle_for })
} else {
Some(ActiveRunDisposition::Stalled { idle_for })
}
Expand Down Expand Up @@ -150,12 +152,17 @@ where
else {
return Ok(Vec::new());
};
let disposition = if stalled_run_has_retained_partial_progress(worktree_mapping.as_ref()) {
ActiveRunDisposition::StalledRetainedPartialProgress { idle_for }
} else {
ActiveRunDisposition::Stalled { idle_for }
};

Ok(vec![ActiveRunReconciliation {
issue,
run_attempt,
worktree_mapping,
disposition: ActiveRunDisposition::Stalled { idle_for },
disposition,
workflow: workflow.clone(),
}])
}
Expand Down Expand Up @@ -241,6 +248,16 @@ where
*idle_for,
)?;
},
ActiveRunDisposition::StalledRetainedPartialProgress { idle_for } => {
reconcile_stalled_retained_partial_progress_run(
tracker,
project,
state_store,
worktree_manager,
&action,
*idle_for,
)?;
},
ActiveRunDisposition::StalledAlreadyNeedsAttention { idle_for } => {
reconcile_stalled_attention_run(project, state_store, &action, *idle_for)?;
},
Expand Down Expand Up @@ -361,6 +378,84 @@ where
state_store.update_run_status(action.run_attempt.run_id(), "stalled")?;
state_store.clear_lease(&action.issue.id)?;

let issue_run = stalled_reconciliation_issue_run(state_store, worktree_manager, action)?;

write_reconciliation_operation_marker_best_effort(
&issue_run.worktree.path,
&issue_run.run_id,
issue_run.attempt_number,
RUN_OPERATION_RECONCILIATION,
);
handle_failure(
tracker,
project,
&action.workflow,
state_store,
&issue_run,
&Report::new(StalledRunNeedsAttention {
issue_identifier: action.issue.identifier.clone(),
run_id: action.run_attempt.run_id().to_owned(),
idle_for,
}),
)?;

Ok(())
}

fn reconcile_stalled_retained_partial_progress_run<T>(
tracker: &T,
project: &ServiceConfig,
state_store: &StateStore,
worktree_manager: &WorktreeManager,
action: &ActiveRunReconciliation,
idle_for: Duration,
) -> Result<()>
where
T: IssueTracker,
{
tracing::warn!(
project_id = project.service_id(),
issue_id = action.issue.id,
issue = action.issue.identifier,
run_id = action.run_attempt.run_id(),
disposition = "stalled_retained_partial_progress",
idle_for_s = idle_for.as_secs(),
"Reconciling stalled run with retained partial progress."
);

state_store.update_run_status(action.run_attempt.run_id(), "stalled")?;
state_store.clear_lease(&action.issue.id)?;

let issue_run = stalled_reconciliation_issue_run(state_store, worktree_manager, action)?;
let worktree_path = relative_worktree_path(project, &issue_run.worktree);

write_reconciliation_operation_marker_best_effort(
&issue_run.worktree.path,
&issue_run.run_id,
issue_run.attempt_number,
RUN_OPERATION_RECONCILIATION,
);
handle_failure(
tracker,
project,
&action.workflow,
state_store,
&issue_run,
&Report::new(RetainedPartialProgress {
issue_identifier: action.issue.identifier.clone(),
run_id: action.run_attempt.run_id().to_owned(),
worktree_path,
}),
)?;

Ok(())
}

fn stalled_reconciliation_issue_run(
state_store: &StateStore,
worktree_manager: &WorktreeManager,
action: &ActiveRunReconciliation,
) -> Result<IssueRunPlan> {
let worktree = action.worktree_mapping.as_ref().map_or_else(
|| worktree_manager.plan_for_issue(&action.issue.identifier),
|mapping| WorktreeSpec {
Expand All @@ -372,7 +467,8 @@ where
);
let retry_budget_base =
retry_budget_base_for_issue_worktree(state_store, &action.issue.id, &worktree.path)?;
let issue_run = IssueRunPlan {

Ok(IssueRunPlan {
issue: action.issue.clone(),
issue_state: planned_issue_state_for_dispatch(
&action.workflow,
Expand All @@ -388,28 +484,7 @@ where
attempt_number: action.run_attempt.attempt_number(),
run_id: action.run_attempt.run_id().to_owned(),
retry_budget_base,
};

write_reconciliation_operation_marker_best_effort(
&issue_run.worktree.path,
&issue_run.run_id,
issue_run.attempt_number,
RUN_OPERATION_RECONCILIATION,
);
handle_failure(
tracker,
project,
&action.workflow,
state_store,
&issue_run,
&Report::new(StalledRunNeedsAttention {
issue_identifier: action.issue.identifier.clone(),
run_id: action.run_attempt.run_id().to_owned(),
idle_for,
}),
)?;

Ok(())
})
}

fn reconcile_stalled_attention_run(
Expand Down Expand Up @@ -456,6 +531,15 @@ fn write_reconciliation_operation_marker_best_effort(
}
}

fn stalled_run_has_retained_partial_progress(
worktree_mapping: Option<&WorktreeMapping>,
) -> bool {
match worktree_mapping {
Some(mapping) => worktree_has_tracked_changes(mapping.worktree_path()),
None => false,
}
}

fn retained_review_handoff_matches_run(
state_store: &StateStore,
run_attempt: &RunAttempt,
Expand Down
4 changes: 4 additions & 0 deletions apps/decodex/src/orchestrator/run_cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,10 @@ where
};
let disposition = if needs_attention {
ActiveRunDisposition::StalledAlreadyNeedsAttention { idle_for }
} else if is_issue_active_for_run(issue, context.workflow)
&& worktree_has_tracked_changes(worktree_mapping.worktree_path())
{
ActiveRunDisposition::StalledRetainedPartialProgress { idle_for }
} else if is_issue_active_for_run(issue, context.workflow) {
ActiveRunDisposition::Stalled { idle_for }
} else {
Expand Down
16 changes: 14 additions & 2 deletions apps/decodex/src/orchestrator/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,22 @@ fn format_terminal_failure_comment(
next_action: &str,
) -> String {
let pr_url_line = pr_url.map_or_else(String::new, |pr_url| format!("\n- pr_url: `{pr_url}`"));
let retained_partial_progress = error_class == "partial_progress_retained";
let heading = if retained_partial_progress {
"decodex retained partial progress and needs attention"
} else {
"decodex run failed and needs attention"
};
let timestamp_label = if retained_partial_progress { "recorded_at" } else { "failed_at" };
let error_summary = if retained_partial_progress {
"Sensitive runtime details were withheld from the tracker comment; inspect the retained lane for the full recovery context."
} else {
"Sensitive runtime details were withheld from the tracker comment; inspect the local lane for the full failure context."
};

format!(
"decodex run failed and needs attention\n\n- run_id: `{run_id}`\n- attempt: `{attempt_number}`\n- failed_at: `{failed_at}`\n- branch: `{branch}`{pr_url_line}\n- worktree_path: `{worktree}`\n- error_class: `{error_class}`\n- next_action: `{next_action}`\n- error_summary: `Sensitive runtime details were withheld from the tracker comment; inspect the local lane for the full failure context.`",
failed_at = current_timestamp(),
"{heading}\n\n- run_id: `{run_id}`\n- attempt: `{attempt_number}`\n- {timestamp_label}: `{timestamp}`\n- branch: `{branch}`{pr_url_line}\n- worktree_path: `{worktree}`\n- error_class: `{error_class}`\n- next_action: `{next_action}`\n- error_summary: `{error_summary}`",
timestamp = current_timestamp(),
branch = branch_name,
worktree = worktree_path
)
Expand Down
31 changes: 17 additions & 14 deletions apps/decodex/src/orchestrator/tests/operator/status/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,20 +813,23 @@ fn live_operator_status_snapshot_surfaces_stalled_retained_partial_progress() {

tracker.issue_comments.borrow_mut().insert(
issue.id.clone(),
vec![linear_execution_history_comment(
&issue,
"terminal_failure",
"2026-03-13T09:20:00Z",
"stalled-retained-partial-progress",
|record| {
record.error_class = Some(String::from("partial_progress_retained"));
record.next_action = Some(String::from(
"inspect retained worktree `.worktrees/PUB-110`, finish validation and PR handoff or reset the patch manually",
));
record.summary = Some(String::from("Decodex run retained partial progress."));
record.blockers = Some(vec![String::from(
"tracked worktree changes were retained after stalled reconciliation",
)]);
vec![linear_execution_history_comment(
&issue,
"needs_attention",
"2026-03-13T09:20:00Z",
"stalled-retained-partial-progress",
|record| {
record.error_class = Some(String::from("partial_progress_retained"));
record.next_action = Some(String::from(
"inspect retained worktree `.worktrees/PUB-110`, finish validation and PR handoff or reset the patch manually",
));
record.terminal_path = Some(String::from("retained_partial_progress"));
record.summary = Some(String::from(
"Decodex retained partial progress and needs attention.",
));
record.blockers = Some(vec![String::from(
"tracked worktree changes were retained after stalled reconciliation",
)]);
record.evidence = Some(vec![String::from(
"worktree `.worktrees/PUB-110` has tracked changes",
)]);
Expand Down
Loading