diff --git a/apps/decodex/src/agent.rs b/apps/decodex/src/agent.rs index 1193561e..45178226 100644 --- a/apps/decodex/src/agent.rs +++ b/apps/decodex/src/agent.rs @@ -4,13 +4,14 @@ mod decodex_tool_bridge; mod json_rpc; mod tracker_tool_bridge; +#[cfg(test)] pub(crate) use self::app_server::MODEL_EXECUTION_IDLE_TIMEOUT; #[cfg(test)] pub(crate) use self::tracker_tool_bridge::DynamicToolHandler; pub(crate) use self::{ app_server::{ ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure, AppServerRunRequest, AppServerRunResult, AppServerThreadArchiveRequest, AppServerTurnFailure, TurnContinuationGuard, archive_app_server_thread_after_success, - execute_app_server_run, probe_app_server, + execute_app_server_run, probe_app_server, protocol_activity_idle_timeout, }, codex_accounts::{CodexAccountPool, CodexAccountProvider}, decodex_tool_bridge::{DecodexRunContext, DecodexToolBridge}, diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index 4fd1fb12..54c8b7eb 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -54,6 +54,7 @@ use crate::{ }; pub(crate) const ACTIVE_RUN_IDLE_TIMEOUT: Duration = Duration::from_secs(300); +pub(crate) const MODEL_EXECUTION_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60); const PROBE_TIMEOUT: Duration = Duration::from_secs(30); const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); @@ -80,6 +81,7 @@ const PREFLIGHT_CHECK_MCP: &str = "mcp"; const PREFLIGHT_PLUGIN_MARKETPLACE_KIND: &str = "local"; const JSONRPC_METHOD_NOT_FOUND: i64 = -32_601; const CHILD_BUCKET_MODEL: &str = "Model"; +const WAITING_REASON_MODEL_EXECUTION: &str = "model_execution"; const CHILD_BUCKET_PROTOCOL: &str = "Protocol"; const CHILD_BUCKET_TOOL: &str = "Tool"; const CHILD_BUCKET_SHELL: &str = "Shell"; @@ -969,6 +971,17 @@ impl RequestWaitPhase { } } +pub(crate) fn protocol_activity_idle_timeout( + protocol_activity: Option<&state::ProtocolActivitySummary>, + base_timeout: Duration, +) -> Duration { + if protocol_activity.is_some_and(running_model_execution_protocol_activity) { + return base_timeout.max(MODEL_EXECUTION_IDLE_TIMEOUT); + } + + base_timeout +} + pub(crate) fn execute_app_server_run( request: &AppServerRunRequest<'_>, state_store: &StateStore, @@ -1050,6 +1063,13 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result bool { + protocol_activity.turn_status.as_deref() == Some("running") + && protocol_activity.waiting_reason.as_deref() == Some(WAITING_REASON_MODEL_EXECUTION) +} + fn preflight_check_blocker_summary(check: &AppServerCapabilityPreflightCheck) -> String { let first_error_path = check.details.get("first_error_path"); let first_error = check.details.get("first_error"); @@ -3135,10 +3155,12 @@ fn wait_for_turn_completion( let mut latest_turn_failure: Option = None; loop { + let idle_timeout = + protocol_activity_idle_timeout(Some(&recorder.protocol_activity.summary), timeout); let wire_message = next_turn_wire_message( client, last_activity_at, - timeout, + idle_timeout, target_thread_id, target_turn_id, latest_turn_failure.as_ref(), diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index defbab7b..12a53ec6 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -33,7 +33,7 @@ use crate::{ }, }, prelude::{Result, eyre}, - state::{self, StateStore}, + state::{self, ProtocolActivitySummary, StateStore}, test_support::TestEnvVarGuard, }; @@ -1006,6 +1006,40 @@ fn remaining_idle_budget_expires_after_idle_timeout() { assert!(super::remaining_idle_budget(last_activity_at, now, timeout).is_none()); } +#[test] +fn protocol_activity_idle_timeout_extends_running_model_execution() { + let protocol_activity = ProtocolActivitySummary { + turn_status: Some(String::from("running")), + waiting_reason: Some(String::from("model_execution")), + ..ProtocolActivitySummary::default() + }; + + assert_eq!( + super::protocol_activity_idle_timeout( + Some(&protocol_activity), + super::ACTIVE_RUN_IDLE_TIMEOUT + ), + super::MODEL_EXECUTION_IDLE_TIMEOUT + ); +} + +#[test] +fn protocol_activity_idle_timeout_keeps_base_timeout_for_other_waits() { + let protocol_activity = ProtocolActivitySummary { + turn_status: Some(String::from("running")), + waiting_reason: Some(String::from("tool_execution")), + ..ProtocolActivitySummary::default() + }; + + assert_eq!( + super::protocol_activity_idle_timeout( + Some(&protocol_activity), + super::ACTIVE_RUN_IDLE_TIMEOUT + ), + super::ACTIVE_RUN_IDLE_TIMEOUT + ); +} + #[test] fn run_recorder_keeps_events_when_marker_write_fails() { let temp_dir = TempDir::new().expect("tempdir should create"); diff --git a/apps/decodex/src/agent/json_rpc.rs b/apps/decodex/src/agent/json_rpc.rs index ca80ba2c..5a869b96 100644 --- a/apps/decodex/src/agent/json_rpc.rs +++ b/apps/decodex/src/agent/json_rpc.rs @@ -21,6 +21,7 @@ use serde_json::{self, Value}; use crate::git_credentials::{GitCredentialEnvironment, GitSigningConfig}; const APP_SERVER_STDERR_TAIL_LINES: usize = 20; +const CODEX_APP_SERVER_BINARY: &str = "codex"; const CODEX_HOME_ENV_VAR: &str = "CODEX_HOME"; const CODEX_SQLITE_HOME_ENV_VAR: &str = "CODEX_SQLITE_HOME"; const CODEX_HOME_DIR_NAME: &str = ".codex"; @@ -234,7 +235,7 @@ impl JsonRpcConnection { listen: &str, process_env: &AppServerProcessEnv, ) -> crate::prelude::Result { - let mut command = Command::new("codex"); + let mut command = Command::new(app_server_command_program()); let _codex_home_env = configure_app_server_command(&mut command, listen, process_env)?; let mut child = command.spawn()?; let stdin = @@ -691,11 +692,48 @@ fn configure_app_server_command( process_env.apply_to(command) } +fn app_server_command_program() -> PathBuf { + app_server_command_program_from_env(env::var_os("PATH"), env::var_os("HOME")) +} + +fn app_server_command_program_from_env( + path_env: Option, + home: Option, +) -> PathBuf { + if let Some(path_env) = path_env { + for path_entry in env::split_paths(&path_env) { + let candidate = path_entry.join(CODEX_APP_SERVER_BINARY); + + if candidate.is_file() { + return candidate; + } + } + } + if let Some(home) = home { + let home = PathBuf::from(home); + + for relative_candidate in + [[".local", "bin", CODEX_APP_SERVER_BINARY], [".cargo", "bin", CODEX_APP_SERVER_BINARY]] + { + let candidate = relative_candidate + .iter() + .fold(home.clone(), |path, component| path.join(*component)); + + if candidate.is_file() { + return candidate; + } + } + } + + PathBuf::from(CODEX_APP_SERVER_BINARY) +} + #[cfg(test)] mod tests { use std::{ collections::{HashMap, VecDeque}, ffi::OsString, + fs, path::PathBuf, process::{Command, Stdio}, sync::{Arc, Mutex, mpsc}, @@ -810,6 +848,25 @@ mod tests { assert_eq!(envs.get("GIT_CONFIG_VALUE_9").map(String::as_str), Some("")); } + #[test] + fn app_server_program_falls_back_to_home_local_codex_when_path_is_sparse() { + let temp_dir = tempfile::tempdir().expect("tempdir should create"); + let local_bin = temp_dir.path().join(".local/bin"); + + fs::create_dir_all(&local_bin).expect("local bin should create"); + + let codex_path = local_bin.join("codex"); + + fs::write(&codex_path, "#!/bin/sh\n").expect("codex fixture should write"); + + let resolved = super::app_server_command_program_from_env( + Some(OsString::from("/usr/bin:/bin")), + Some(temp_dir.path().as_os_str().to_owned()), + ); + + assert_eq!(resolved, codex_path); + } + #[test] fn app_server_command_does_not_rewrite_git_urls_without_credentials() { let mut command = Command::new("codex"); diff --git a/apps/decodex/src/orchestrator/execution.rs b/apps/decodex/src/orchestrator/execution.rs index 87090fcc..c43fa4a0 100644 --- a/apps/decodex/src/orchestrator/execution.rs +++ b/apps/decodex/src/orchestrator/execution.rs @@ -2,9 +2,44 @@ use git_credentials::GitSigningConfig; use agent::CodexAccountPool; use agent::CodexAccountProvider; use agent::AppServerThreadArchiveRequest; +use records::LinearExecutionEventPublicProjection; use crate::tracker::privacy_classifier::PublicProjectionPrivacyClassifier; +#[derive(Debug)] +pub(crate) struct AppServerZeroEvidenceStartFailure { + issue_identifier: String, + run_id: String, +} +impl AppServerZeroEvidenceStartFailure { + fn new(issue_identifier: String, run_id: String) -> Self { + Self { issue_identifier, run_id } + } + + fn error_class(&self) -> &'static str { + "app_server_zero_evidence_start_failed" + } + + fn terminal_next_action(&self, recovery_gate: &str) -> String { + format!( + "inspect local app-server startup logs and Decodex account/runtime state for run `{}`, verify `decodex probe stdio://`, restart `decodex serve` if needed, {recovery_gate}", + self.run_id + ) + } +} + +impl Display for AppServerZeroEvidenceStartFailure { + fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { + write!( + formatter, + "App-server run `{}` for issue `{}` failed before Decodex recorded a thread, turn, protocol event, or private execution event.", + self.run_id, self.issue_identifier + ) + } +} + +impl Error for AppServerZeroEvidenceStartFailure {} + struct AgentGitCredentialEnvironment { process_env: AppServerProcessEnv, askpass_path: PathBuf, @@ -44,12 +79,23 @@ struct RunStartedLifecycleFields<'a> { privacy_classifier: &'a dyn PublicProjectionPrivacyClassifier, } +#[derive(Clone, Copy)] struct TerminalFailureWritebackRuntime<'a> { service_id: &'a str, state_store: Option<&'a StateStore>, privacy_classifier: &'a dyn PublicProjectionPrivacyClassifier, } +struct PreparedTerminalFailureWriteback { + failure_state_id: String, + needs_attention_label: String, + needs_attention_label_id: Option, + terminal_failure_state_name: String, + projection: LinearExecutionEventPublicProjection, + error_class: &'static str, + retry_guarded_by_state: bool, +} + struct CompletedAppServerRun<'a, T> where T: IssueTracker, @@ -72,6 +118,25 @@ struct ThreadArchiveCandidate { sequence_number: i64, } +struct ZeroEvidenceAppServerStartFailureContext { + protocol_event_count: i64, + private_event_count: usize, + thread_recorded: bool, + turn_recorded: bool, +} + +struct ZeroEvidenceAppServerStartFailureDiagnostic { + source_error_summary: String, + source_error_chain: Vec, +} + +#[derive(Clone, Copy, Eq, PartialEq)] +enum TerminalFailureEventRecordStatus { + Recorded, + Duplicate, + NoLocalStore, +} + fn execute_issue_run( tracker: &T, project: &ServiceConfig, @@ -472,19 +537,14 @@ where let closeout_review_state_inspector = GhPullRequestReviewStateInspector { github_token_env_var: Some(project.github().token_env_var().to_owned()), }; - let continuation_guard = IssueTurnContinuationGuard { + let continuation_guard = build_issue_turn_continuation_guard( tracker, - tracker_tool_bridge: &tracker_tool_bridge, + &tracker_tool_bridge, workflow, - service_id: project.service_id(), - issue_id: &issue_run.issue.id, - issue_identifier: &issue_run.issue.identifier, - initial_issue_state: &issue_run.initial_issue_state, - #[cfg(test)] - retry_project_slug: "", - dispatch_mode: issue_run.dispatch_mode, - review_state_inspector: Some(&closeout_review_state_inspector), - }; + project, + issue_run, + Some(&closeout_review_state_inspector), + ); let decodex_tool_bridge = DecodexToolBridge::new(&tracker_tool_bridge, build_decodex_run_context(workflow, issue_run)); let run_result = agent::execute_app_server_run( @@ -534,10 +594,12 @@ where state_store, ) .map_err(|error| { - preserve_manual_attention_request( - tracker_tool_bridge.completion_disposition(), + preserve_and_promote_app_server_run_failure( + project, + state_store, issue_run, workflow, + tracker_tool_bridge.completion_disposition(), error, ) })?; @@ -559,6 +621,32 @@ where }) } +fn build_issue_turn_continuation_guard<'a, T>( + tracker: &'a T, + tracker_tool_bridge: &'a TrackerToolBridge<'a>, + workflow: &'a WorkflowDocument, + project: &'a ServiceConfig, + issue_run: &'a IssueRunPlan, + review_state_inspector: Option<&'a dyn PullRequestReviewStateInspector>, +) -> IssueTurnContinuationGuard<'a, T> +where + T: IssueTracker, +{ + IssueTurnContinuationGuard { + tracker, + tracker_tool_bridge, + workflow, + service_id: project.service_id(), + issue_id: &issue_run.issue.id, + issue_identifier: &issue_run.issue.identifier, + initial_issue_state: &issue_run.initial_issue_state, + #[cfg(test)] + retry_project_slug: "", + dispatch_mode: issue_run.dispatch_mode, + review_state_inspector, + } +} + fn finalize_completed_app_server_run(run: CompletedAppServerRun<'_, T>) -> Result where T: IssueTracker, @@ -1144,8 +1232,200 @@ fn preserve_manual_attention_request( error } +fn preserve_and_promote_app_server_run_failure( + project: &ServiceConfig, + state_store: &StateStore, + issue_run: &IssueRunPlan, + workflow: &WorkflowDocument, + completion_disposition: Result, + error: Report, +) -> Report { + let error = + preserve_manual_attention_request(completion_disposition, issue_run, workflow, error); + + promote_zero_evidence_app_server_start_failure(project, state_store, issue_run, error) +} + +fn promote_zero_evidence_app_server_start_failure( + project: &ServiceConfig, + state_store: &StateStore, + issue_run: &IssueRunPlan, + error: Report, +) -> Report { + if run_failure_requires_terminal_attention(&error) { + return error; + } + + match zero_evidence_app_server_start_failure_context(project, state_store, issue_run) { + Ok(Some(context)) => { + let diagnostic = zero_evidence_app_server_start_failure_diagnostic(&error); + + if let Err(record_error) = record_zero_evidence_app_server_start_failure( + project, + state_store, + issue_run, + &context, + &diagnostic, + ) { + tracing::warn!( + ?record_error, + project_id = project.service_id(), + issue_id = issue_run.issue.id, + issue = issue_run.issue.identifier, + run_id = issue_run.run_id, + attempt = issue_run.attempt_number, + "Failed to record zero-evidence app-server start failure evidence." + ); + } + + Report::new(AppServerZeroEvidenceStartFailure::new( + issue_run.issue.identifier.clone(), + issue_run.run_id.clone(), + )) + .wrap_err(error) + }, + Ok(None) => error, + Err(context_error) => { + tracing::warn!( + ?context_error, + project_id = project.service_id(), + issue_id = issue_run.issue.id, + issue = issue_run.issue.identifier, + run_id = issue_run.run_id, + attempt = issue_run.attempt_number, + "Failed to classify app-server start failure evidence." + ); + + error + }, + } +} + +fn zero_evidence_app_server_start_failure_context( + project: &ServiceConfig, + state_store: &StateStore, + issue_run: &IssueRunPlan, +) -> Result> { + let protocol_event_count = state_store.event_count(&issue_run.run_id)?; + let private_event_count = state_store + .list_private_execution_events( + project.service_id(), + &issue_run.issue.id, + &issue_run.run_id, + issue_run.attempt_number, + )? + .len(); + let run_attempt = state_store.run_attempt(&issue_run.run_id)?; + let thread_recorded = run_attempt.as_ref().and_then(|attempt| attempt.thread_id()).is_some(); + let turn_recorded = run_attempt.as_ref().and_then(|attempt| attempt.turn_id()).is_some(); + + if protocol_event_count == 0 && private_event_count == 0 && !thread_recorded && !turn_recorded { + Ok(Some(ZeroEvidenceAppServerStartFailureContext { + protocol_event_count, + private_event_count, + thread_recorded, + turn_recorded, + })) + } else { + Ok(None) + } +} + +fn record_zero_evidence_app_server_start_failure( + project: &ServiceConfig, + state_store: &StateStore, + issue_run: &IssueRunPlan, + context: &ZeroEvidenceAppServerStartFailureContext, + diagnostic: &ZeroEvidenceAppServerStartFailureDiagnostic, +) -> Result<()> { + state_store + .append_private_execution_event( + project.service_id(), + &issue_run.issue.id, + &issue_run.run_id, + issue_run.attempt_number, + "app_server_zero_evidence_start_failure", + json!({ + "error_class": "app_server_zero_evidence_start_failed", + "summary": "App-server dispatch failed before Decodex recorded a thread, turn, protocol event, or private execution event.", + "issue_identifier": issue_run.issue.identifier.as_str(), + "attempt_number": issue_run.attempt_number, + "branch": issue_run.worktree.branch_name.as_str(), + "worktree_path": issue_run.worktree.path.display().to_string(), + "protocol_event_count": context.protocol_event_count, + "private_event_count": context.private_event_count, + "thread_recorded": context.thread_recorded, + "turn_recorded": context.turn_recorded, + "source_error_summary": diagnostic.source_error_summary.as_str(), + "source_error_chain": &diagnostic.source_error_chain, + }), + ) + .map(|_| ()) +} + +fn zero_evidence_app_server_start_failure_diagnostic( + error: &Report, +) -> ZeroEvidenceAppServerStartFailureDiagnostic { + let source_error_chain = error + .chain() + .map(|cause| sanitize_private_diagnostic_text(&cause.to_string())) + .collect::>(); + let source_error_summary = source_error_chain + .first() + .cloned() + .unwrap_or_else(|| String::from("unknown app-server startup failure")); + + ZeroEvidenceAppServerStartFailureDiagnostic { source_error_summary, source_error_chain } +} + +fn sanitize_private_diagnostic_text(text: &str) -> String { + let mut sanitized = text.to_owned(); + + for (name, value) in env::vars() { + if !diagnostic_env_var_name_is_sensitive(&name) || value.len() < 6 { + continue; + } + + let replacement = format!(""); + + sanitized = sanitized.replace(&value, &replacement); + } + + truncate_private_diagnostic_text(&sanitized) +} + +fn diagnostic_env_var_name_is_sensitive(name: &str) -> bool { + let normalized = name.to_ascii_lowercase(); + + normalized.contains("token") + || normalized.contains("secret") + || normalized.contains("password") + || normalized.contains("credential") + || normalized.contains("api_key") + || normalized.contains("apikey") + || normalized.ends_with("_pat") + || normalized.starts_with("pat_") + || normalized.contains("_pat_") + || normalized.contains("auth") +} + +fn truncate_private_diagnostic_text(text: &str) -> String { + const MAX_PRIVATE_DIAGNOSTIC_TEXT_CHARS: usize = 2_000; + + if text.chars().count() <= MAX_PRIVATE_DIAGNOSTIC_TEXT_CHARS { + return text.to_owned(); + } + + let mut truncated = text.chars().take(MAX_PRIVATE_DIAGNOSTIC_TEXT_CHARS).collect::(); + + truncated.push_str("..."); + + truncated +} + fn run_failure_requires_terminal_attention(error: &Report) -> bool { error.downcast_ref::().is_some() + || error.downcast_ref::().is_some() || error.downcast_ref::().is_some() || error.downcast_ref::().is_some() || error.downcast_ref::().is_some() @@ -1371,6 +1651,46 @@ fn apply_terminal_failure_writeback( manual_attention_requested: bool, error: &Report, ) -> Result +where + T: IssueTracker, +{ + let writeback = prepare_terminal_failure_writeback( + tracker, + runtime, + workflow, + issue_run, + worktree_path, + manual_attention_requested, + error, + )?; + let event_status = + record_terminal_failure_writeback_event(tracker, runtime, issue_run, &writeback)?; + + if event_status == TerminalFailureEventRecordStatus::Duplicate { + return Ok(terminal_failure_outcome(&writeback)); + } + + let writeback_result = + apply_terminal_failure_tracker_writeback(tracker, runtime, issue_run, &writeback); + + if let Err(error) = writeback_result { + forget_terminal_failure_writeback_event(runtime, event_status, &writeback)?; + + return Err(error); + } + + Ok(terminal_failure_outcome(&writeback)) +} + +fn prepare_terminal_failure_writeback( + tracker: &T, + runtime: TerminalFailureWritebackRuntime<'_>, + workflow: &WorkflowDocument, + issue_run: &IssueRunPlan, + worktree_path: &str, + manual_attention_requested: bool, + error: &Report, +) -> Result where T: IssueTracker, { @@ -1384,9 +1704,8 @@ where let failure_state_name = tracker_policy.failure_state(); let failure_state_is_startable = tracker_policy.startable_states().iter().any(|state| state == failure_state_name); - let guard_with_nonstartable_state = - needs_attention_label_id.is_none() && failure_state_is_startable; - let terminal_failure_state_name = if guard_with_nonstartable_state { + let retry_guarded_by_state = needs_attention_label_id.is_none() && failure_state_is_startable; + let terminal_failure_state_name = if retry_guarded_by_state { tracker_policy.in_progress_state() } else { failure_state_name @@ -1399,21 +1718,10 @@ where issue_run.issue.identifier ) })?; - - tracker.update_issue_state(&issue_run.issue.id, failure_state_id)?; - - let needs_attention_label_available = apply_needs_attention_label( - tracker, - issue_run, - runtime.service_id, - needs_attention_label, - needs_attention_label_id, - terminal_failure_state_name, - )?; let recovery_gate = terminal_failure_recovery_gate( needs_attention_label, - needs_attention_label_available, - guard_with_nonstartable_state, + needs_attention_label_id.is_some(), + retry_guarded_by_state, tracker_policy.in_progress_state(), ); let (error_class, next_action) = @@ -1446,30 +1754,148 @@ where runtime.privacy_classifier, )?; - if let Some(state_store) = runtime.state_store { - if state_store.record_linear_execution_event(&projection.record)? - && let Err(error) = tracker::create_prepared_linear_execution_event_comment_without_remote_scan( - tracker, - &issue_run.issue.id, - &projection, - ) - { - state_store.forget_linear_execution_event(&projection.record.idempotency_key)?; + Ok(PreparedTerminalFailureWriteback { + failure_state_id: failure_state_id.to_owned(), + needs_attention_label: needs_attention_label.to_owned(), + needs_attention_label_id, + terminal_failure_state_name: terminal_failure_state_name.to_owned(), + projection, + error_class, + retry_guarded_by_state, + }) +} - return Err(error); +fn record_terminal_failure_writeback_event( + tracker: &T, + runtime: TerminalFailureWritebackRuntime<'_>, + issue_run: &IssueRunPlan, + writeback: &PreparedTerminalFailureWriteback, +) -> Result +where + T: IssueTracker, +{ + let event_status = if let Some(state_store) = runtime.state_store { + if !state_store.record_linear_execution_event(&writeback.projection.record)? { + return Ok(TerminalFailureEventRecordStatus::Duplicate); } + + TerminalFailureEventRecordStatus::Recorded + } else { + TerminalFailureEventRecordStatus::NoLocalStore + }; + + if remote_terminal_failure_writeback_exists( + tracker, + runtime, + issue_run, + writeback, + event_status, + )? { + return Ok(TerminalFailureEventRecordStatus::Duplicate); + } + + Ok(event_status) +} + +fn remote_terminal_failure_writeback_exists( + tracker: &T, + runtime: TerminalFailureWritebackRuntime<'_>, + issue_run: &IssueRunPlan, + writeback: &PreparedTerminalFailureWriteback, + event_status: TerminalFailureEventRecordStatus, +) -> Result +where + T: IssueTracker, +{ + let comments = match tracker.list_comments(&issue_run.issue.id) { + Ok(comments) => comments, + Err(error) => { + forget_terminal_failure_writeback_event(runtime, event_status, writeback)?; + + return Err(error); + }, + }; + + if !records::has_linear_execution_event_record( + &comments, + &writeback.projection.record.service_id, + &writeback.projection.record.issue_id, + &writeback.projection.record.idempotency_key, + ) { + return Ok(false); + } + + tracing::debug!( + service_id = writeback.projection.record.service_id, + issue_id = issue_run.issue.id, + issue = issue_run.issue.identifier, + run_id = issue_run.run_id, + attempt = issue_run.attempt_number, + event_type = writeback.projection.record.event_type, + "Skipping terminal failure writeback already present in remote Linear ledger." + ); + + Ok(true) +} + +fn apply_terminal_failure_tracker_writeback( + tracker: &T, + runtime: TerminalFailureWritebackRuntime<'_>, + issue_run: &IssueRunPlan, + writeback: &PreparedTerminalFailureWriteback, +) -> Result<()> +where + T: IssueTracker, +{ + tracker.update_issue_state(&issue_run.issue.id, &writeback.failure_state_id)?; + + apply_needs_attention_label( + tracker, + issue_run, + runtime.service_id, + &writeback.needs_attention_label, + writeback.needs_attention_label_id.clone(), + &writeback.terminal_failure_state_name, + )?; + + if runtime.state_store.is_some() { + tracker::create_prepared_linear_execution_event_comment_without_remote_scan( + tracker, + &issue_run.issue.id, + &writeback.projection, + )?; } else { tracker::create_prepared_linear_execution_event_comment( tracker, &issue_run.issue.id, - &projection, + &writeback.projection, )?; } - Ok(TerminalFailureOutcome { - error_class, - retry_guarded_by_state: guard_with_nonstartable_state, - }) + Ok(()) +} + +fn forget_terminal_failure_writeback_event( + runtime: TerminalFailureWritebackRuntime<'_>, + event_status: TerminalFailureEventRecordStatus, + writeback: &PreparedTerminalFailureWriteback, +) -> Result<()> { + if event_status == TerminalFailureEventRecordStatus::Recorded + && let Some(state_store) = runtime.state_store + { + state_store.forget_linear_execution_event(&writeback.projection.record.idempotency_key)?; + } + + Ok(()) +} + +fn terminal_failure_outcome( + writeback: &PreparedTerminalFailureWriteback, +) -> TerminalFailureOutcome { + TerminalFailureOutcome { + error_class: writeback.error_class, + retry_guarded_by_state: writeback.retry_guarded_by_state, + } } fn apply_needs_attention_label( diff --git a/apps/decodex/src/orchestrator/reconciliation.rs b/apps/decodex/src/orchestrator/reconciliation.rs index 5261f028..af29f592 100644 --- a/apps/decodex/src/orchestrator/reconciliation.rs +++ b/apps/decodex/src/orchestrator/reconciliation.rs @@ -217,6 +217,9 @@ where *idle_for, )?; }, + ActiveRunDisposition::StalledAlreadyNeedsAttention { idle_for } => { + reconcile_stalled_attention_run(project, state_store, &action, *idle_for)?; + }, } } @@ -355,6 +358,27 @@ where Ok(()) } +fn reconcile_stalled_attention_run( + project: &ServiceConfig, + state_store: &StateStore, + action: &ActiveRunReconciliation, + idle_for: Duration, +) -> Result<()> { + tracing::warn!( + project_id = project.service_id(), + issue_id = action.issue.id, + issue = action.issue.identifier, + run_id = action.run_attempt.run_id(), + disposition = "stalled_already_needs_attention", + idle_for_s = idle_for.as_secs(), + "Reconciling stalled run that is already blocked for operator attention." + ); + + state_store.update_run_status(action.run_attempt.run_id(), "stalled")?; + + state_store.clear_lease(&action.issue.id) +} + fn write_reconciliation_operation_marker_best_effort( worktree_path: &Path, run_id: &str, @@ -417,14 +441,39 @@ fn stalled_idle_duration( let Some(idle_for) = observed_idle_duration(last_activity, now_unix_epoch) else { return Ok(None); }; + let idle_timeout = active_run_idle_timeout(run_attempt, worktree_mapping)?; - if idle_for >= ACTIVE_RUN_IDLE_TIMEOUT { + if idle_for >= idle_timeout { return Ok(Some(idle_for)); } Ok(None) } +fn active_run_idle_timeout( + run_attempt: &RunAttempt, + worktree_mapping: Option<&WorktreeMapping>, +) -> Result { + let Some(worktree_mapping) = worktree_mapping else { + return Ok(ACTIVE_RUN_IDLE_TIMEOUT); + }; + let Some(marker) = state::read_run_activity_marker_snapshot(worktree_mapping.worktree_path())? + else { + return Ok(ACTIVE_RUN_IDLE_TIMEOUT); + }; + + if marker.run_id() != run_attempt.run_id() + || marker.attempt_number() != run_attempt.attempt_number() + { + return Ok(ACTIVE_RUN_IDLE_TIMEOUT); + } + + Ok(agent::protocol_activity_idle_timeout( + marker.protocol_activity(), + ACTIVE_RUN_IDLE_TIMEOUT, + )) +} + fn last_observed_run_activity_unix_epoch( state_store: &StateStore, run_attempt: &RunAttempt, diff --git a/apps/decodex/src/orchestrator/run_cycle.rs b/apps/decodex/src/orchestrator/run_cycle.rs index d39243ac..8274f8e2 100644 --- a/apps/decodex/src/orchestrator/run_cycle.rs +++ b/apps/decodex/src/orchestrator/run_cycle.rs @@ -20,6 +20,27 @@ struct RetainedReviewRuntime<'a, T> { now_unix_epoch: i64, } +struct PassiveRetainedAttentionRuntime<'a, T> { + tracker: &'a T, + project: &'a ServiceConfig, + workflow: &'a WorkflowDocument, + state_store: &'a StateStore, +} +impl Clone for PassiveRetainedAttentionRuntime<'_, T> { + fn clone(&self) -> Self { + *self + } +} +impl Copy for PassiveRetainedAttentionRuntime<'_, T> {} + +struct ProjectStateReconciliationContext<'a, T> { + tracker: &'a T, + project: &'a ServiceConfig, + workflow: &'a WorkflowDocument, + state_store: &'a StateStore, + worktree_manager: &'a WorktreeManager, +} + #[derive(Clone, Copy)] struct RetainedReviewOrchestrationMarkerFields { request_comment_database_id: Option, @@ -219,9 +240,7 @@ where RetainedReviewLaneLoad::Skip => continue, RetainedReviewLaneLoad::Blocked(blocked) => { apply_passive_retained_manual_attention_with_run_identity( - tracker, - project, - workflow, + PassiveRetainedAttentionRuntime { tracker, project, workflow, state_store }, &blocked.issue, &blocked.worktree, &blocked.run_identity, @@ -239,9 +258,7 @@ where &lane.orchestration_marker, ) { apply_passive_retained_manual_attention( - tracker, - project, - workflow, + PassiveRetainedAttentionRuntime { tracker, project, workflow, state_store }, &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -503,6 +520,7 @@ where tracker, project, workflow, + state_store, lane, now_unix_epoch, "external_review_merge_visibility_timeout", @@ -530,6 +548,7 @@ where tracker, project, workflow, + state_store, lane, now_unix_epoch, "internal_review_only_merge_visibility_timeout", @@ -608,9 +627,7 @@ where }, ExternalReviewRequestCiGate::ManualAttention(reason) => { return apply_passive_retained_manual_attention( - tracker, - project, - workflow, + PassiveRetainedAttentionRuntime { tracker, project, workflow, state_store }, &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -695,9 +712,7 @@ where } apply_passive_retained_manual_attention( - tracker, - project, - workflow, + PassiveRetainedAttentionRuntime { tracker, project, workflow, state_store }, &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -775,9 +790,7 @@ where } if external_review_result_arrived(&lane.review_state, &lane.orchestration_marker) { return apply_passive_retained_manual_attention( - runtime.tracker, - runtime.project, - runtime.workflow, + passive_attention_runtime(runtime), &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -801,6 +814,7 @@ fn handle_waiting_for_merge_phase( tracker: &T, project: &ServiceConfig, workflow: &WorkflowDocument, + state_store: &StateStore, lane: &RetainedReviewLane, now_unix_epoch: i64, timeout_reason: &str, @@ -821,9 +835,7 @@ where } apply_passive_retained_manual_attention( - tracker, - project, - workflow, + PassiveRetainedAttentionRuntime { tracker, project, workflow, state_store }, &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -894,9 +906,7 @@ where { if !lane.review_state.merge_commit_allowed { return apply_passive_retained_manual_attention( - runtime.tracker, - runtime.project, - runtime.workflow, + passive_attention_runtime(runtime), &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -916,9 +926,7 @@ where ); return apply_passive_retained_manual_attention( - runtime.tracker, - runtime.project, - runtime.workflow, + passive_attention_runtime(runtime), &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -960,9 +968,7 @@ where } apply_passive_retained_manual_attention( - runtime.tracker, - runtime.project, - runtime.workflow, + passive_attention_runtime(runtime), &lane.snapshot.issue, &lane.snapshot.worktree, &lane.orchestration_marker, @@ -1057,9 +1063,7 @@ fn ensure_review_orchestration_marker( } fn apply_passive_retained_manual_attention( - tracker: &T, - project: &ServiceConfig, - workflow: &WorkflowDocument, + runtime: PassiveRetainedAttentionRuntime<'_, T>, issue: &TrackerIssue, worktree: &WorktreeMapping, orchestration_marker: &ReviewOrchestrationMarker, @@ -1069,9 +1073,7 @@ where T: IssueTracker, { apply_passive_retained_manual_attention_with_run_identity( - tracker, - project, - workflow, + runtime, issue, worktree, &RetainedReviewRunIdentity { @@ -1083,9 +1085,7 @@ where } fn apply_passive_retained_manual_attention_with_run_identity( - tracker: &T, - project: &ServiceConfig, - workflow: &WorkflowDocument, + runtime: PassiveRetainedAttentionRuntime<'_, T>, issue: &TrackerIssue, worktree: &WorktreeMapping, run_identity: &RetainedReviewRunIdentity, @@ -1112,16 +1112,16 @@ where retry_budget_base: 0, }; let worktree_path = - relative_worktree_path_for_path(project, synthetic_issue_run.worktree.path.as_path()); - let privacy_classifier = configured_public_projection_privacy_classifier(project)?; + relative_worktree_path_for_path(runtime.project, synthetic_issue_run.worktree.path.as_path()); + let privacy_classifier = configured_public_projection_privacy_classifier(runtime.project)?; let _ = apply_terminal_failure_writeback( - tracker, + runtime.tracker, TerminalFailureWritebackRuntime { - service_id: project.service_id(), - state_store: None, + service_id: runtime.project.service_id(), + state_store: Some(runtime.state_store), privacy_classifier: &privacy_classifier, }, - workflow, + runtime.workflow, &synthetic_issue_run, &worktree_path, true, @@ -1131,6 +1131,17 @@ where Ok(()) } +fn passive_attention_runtime<'a, T>( + runtime: &'a RetainedReviewRuntime<'_, T>, +) -> PassiveRetainedAttentionRuntime<'a, T> { + PassiveRetainedAttentionRuntime { + tracker: runtime.tracker, + project: runtime.project, + workflow: runtime.workflow, + state_store: runtime.state_store, + } +} + fn plan_project_issue_run_with_exclusions( tracker: &T, project: &ServiceConfig, @@ -2448,92 +2459,243 @@ where .into_iter() .map(|issue| (issue.id.clone(), issue)) .collect::>(); + let reconciliation_context = ProjectStateReconciliationContext { + tracker, + project, + workflow, + state_store, + worktree_manager, + }; let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp(); let mut cleared_terminal_lane_issue_ids = HashSet::new(); - for lease in &leases { - if let Some(issue) = issues_by_id.get(lease.issue_id()) - && issue.state.name == workflow.frontmatter().tracker().success_state() - && retained_review_lease_matches_run(state_store, lease)? - { - mark_run_attempt_if_active(state_store, lease.run_id(), "succeeded")?; + reconcile_active_project_leases( + &reconciliation_context, + &leases, + &issues_by_id, + now_unix_epoch, + &mut cleared_terminal_lane_issue_ids, + )?; + reconcile_orphaned_active_worktree_runs( + &reconciliation_context, + &leases, + &worktrees, + &issues_by_id, + now_unix_epoch, + )?; + cleanup_terminal_project_worktrees( + &reconciliation_context, + &worktrees, + &issues_by_id, + &mut cleared_terminal_lane_issue_ids, + )?; - state_store.clear_lease(lease.issue_id())?; + Ok(()) +} +fn reconcile_active_project_leases( + context: &ProjectStateReconciliationContext<'_, T>, + leases: &[IssueLease], + issues_by_id: &HashMap, + now_unix_epoch: i64, + cleared_terminal_lane_issue_ids: &mut HashSet, +) -> Result<()> +where + T: IssueTracker, +{ + for lease in leases { + if reconcile_success_retained_review_lease(context, lease, issues_by_id)? { + continue; + } + if reconcile_terminal_retained_closeout_lease( + context, + lease, + issues_by_id, + now_unix_epoch, + cleared_terminal_lane_issue_ids, + )? { continue; } - if let Some(issue) = issues_by_id.get(lease.issue_id()) - && terminal_issue_keeps_retained_closeout( - tracker, - issue, - project, - workflow, - state_store, - )? - { - if retained_closeout_lease_has_fresh_activity( - lease, - issue, - project, - now_unix_epoch, - )? { - continue; - } - clear_terminal_lane_labels_once( - tracker, - project, - issue, - &mut cleared_terminal_lane_issue_ids, - )?; - mark_run_attempt_if_active(state_store, lease.run_id(), "interrupted")?; + reconcile_stale_project_lease( + context, + lease, + issues_by_id, + cleared_terminal_lane_issue_ids, + )?; + } + + Ok(()) +} + +fn reconcile_success_retained_review_lease( + context: &ProjectStateReconciliationContext<'_, T>, + lease: &IssueLease, + issues_by_id: &HashMap, +) -> Result +where + T: IssueTracker, +{ + if let Some(issue) = issues_by_id.get(lease.issue_id()) + && issue.state.name == context.workflow.frontmatter().tracker().success_state() + && retained_review_lease_matches_run(context.state_store, lease)? + { + mark_run_attempt_if_active(context.state_store, lease.run_id(), "succeeded")?; + + context.state_store.clear_lease(lease.issue_id())?; + + return Ok(true); + } + + Ok(false) +} + +fn reconcile_terminal_retained_closeout_lease( + context: &ProjectStateReconciliationContext<'_, T>, + lease: &IssueLease, + issues_by_id: &HashMap, + now_unix_epoch: i64, + cleared_terminal_lane_issue_ids: &mut HashSet, +) -> Result +where + T: IssueTracker, +{ + let Some(issue) = issues_by_id.get(lease.issue_id()) else { + return Ok(false); + }; + + if !terminal_issue_keeps_retained_closeout( + context.tracker, + issue, + context.project, + context.workflow, + context.state_store, + )? { + return Ok(false); + } + if retained_closeout_lease_has_fresh_activity( + lease, + issue, + context.project, + now_unix_epoch, + )? { + return Ok(true); + } + + clear_terminal_lane_labels_once( + context.tracker, + context.project, + issue, + cleared_terminal_lane_issue_ids, + )?; + mark_run_attempt_if_active(context.state_store, lease.run_id(), "interrupted")?; + + context.state_store.clear_lease(lease.issue_id())?; + + Ok(true) +} - state_store.clear_lease(lease.issue_id())?; +fn reconcile_stale_project_lease( + context: &ProjectStateReconciliationContext<'_, T>, + lease: &IssueLease, + issues_by_id: &HashMap, + cleared_terminal_lane_issue_ids: &mut HashSet, +) -> Result<()> +where + T: IssueTracker, +{ + let reconciled_status = match issues_by_id.get(lease.issue_id()) { + Some(issue) if is_terminal_issue(issue, context.workflow) => "terminated", + Some(_) | None => "interrupted", + }; + if let Some(issue) = issues_by_id.get(lease.issue_id()) + && is_terminal_issue(issue, context.workflow) + { + clear_terminal_lane_labels_once( + context.tracker, + context.project, + issue, + cleared_terminal_lane_issue_ids, + )?; + } + + mark_run_attempt_if_active(context.state_store, lease.run_id(), reconciled_status)?; + + context.state_store.clear_lease(lease.issue_id()) +} + +fn reconcile_orphaned_active_worktree_runs( + context: &ProjectStateReconciliationContext<'_, T>, + leases: &[IssueLease], + worktrees: &[WorktreeMapping], + issues_by_id: &HashMap, + now_unix_epoch: i64, +) -> Result<()> +where + T: IssueTracker, +{ + let mut orphaned_actions = Vec::new(); + + for mapping in worktrees { + if leases.iter().any(|lease| lease.issue_id() == mapping.issue_id()) { continue; } - let reconciled_status = match issues_by_id.get(lease.issue_id()) { - Some(issue) if is_terminal_issue(issue, workflow) => "terminated", - Some(_) | None => "interrupted", + let Some(issue) = issues_by_id.get(mapping.issue_id()) else { + continue; + }; + let Some(action) = inspect_orphaned_active_worktree_reconciliation( + context, + issue, + mapping, + now_unix_epoch, + )? else { + continue; }; - if let Some(issue) = issues_by_id.get(lease.issue_id()) - && is_terminal_issue(issue, workflow) - { - clear_terminal_lane_labels_once( - tracker, - project, - issue, - &mut cleared_terminal_lane_issue_ids, - )?; - } + orphaned_actions.push(action); + } - mark_run_attempt_if_active(state_store, lease.run_id(), reconciled_status)?; + apply_active_run_reconciliation( + context.tracker, + context.project, + context.state_store, + context.worktree_manager, + orphaned_actions, + ) +} - state_store.clear_lease(lease.issue_id())?; - } - for mapping in &worktrees { +fn cleanup_terminal_project_worktrees( + context: &ProjectStateReconciliationContext<'_, T>, + worktrees: &[WorktreeMapping], + issues_by_id: &HashMap, + cleared_terminal_lane_issue_ids: &mut HashSet, +) -> Result<()> +where + T: IssueTracker, +{ + for mapping in worktrees { if let Some(issue) = issues_by_id.get(mapping.issue_id()) - && is_terminal_issue(issue, workflow) - && !terminal_issue_keeps_retained_closeout( - tracker, - issue, - project, - workflow, - state_store, - )? + && is_terminal_issue(issue, context.workflow) + && !terminal_issue_keeps_retained_closeout( + context.tracker, + issue, + context.project, + context.workflow, + context.state_store, + )? { clear_terminal_lane_labels_once( - tracker, - project, + context.tracker, + context.project, issue, - &mut cleared_terminal_lane_issue_ids, + cleared_terminal_lane_issue_ids, )?; cleanup_worktree_mapping( - state_store, - worktree_manager, - workflow, + context.state_store, + context.worktree_manager, + context.workflow, &issue.identifier, mapping, )?; @@ -2543,6 +2705,93 @@ where Ok(()) } +fn inspect_orphaned_active_worktree_reconciliation( + context: &ProjectStateReconciliationContext<'_, T>, + issue: &TrackerIssue, + worktree_mapping: &WorktreeMapping, + now_unix_epoch: i64, +) -> Result> +where + T: IssueTracker, +{ + let has_service_ownership = + issue_has_service_ownership(context.tracker, issue, context.project.service_id())?; + let needs_attention = issue + .has_label(context.workflow.frontmatter().tracker().needs_attention_label()); + + if !has_service_ownership && !needs_attention { + return Ok(None); + } + + let Some(run_attempt) = context.state_store.latest_run_attempt_for_issue(&issue.id)? else { + return Ok(None); + }; + let Some(idle_for) = + orphaned_active_run_idle_duration( + context.state_store, + &run_attempt, + worktree_mapping, + now_unix_epoch, + )? + else { + return Ok(None); + }; + let disposition = if needs_attention { + ActiveRunDisposition::StalledAlreadyNeedsAttention { idle_for } + } else if is_issue_active_for_run(issue, context.workflow) { + ActiveRunDisposition::Stalled { idle_for } + } else { + return Ok(None); + }; + + Ok(Some(ActiveRunReconciliation { + issue: issue.clone(), + run_attempt, + worktree_mapping: Some(worktree_mapping.clone()), + disposition, + workflow: context.workflow.clone(), + })) +} + +fn orphaned_active_run_idle_duration( + state_store: &StateStore, + run_attempt: &RunAttempt, + worktree_mapping: &WorktreeMapping, + now_unix_epoch: i64, +) -> Result> { + if !matches!(run_attempt.status(), "starting" | "running") { + return Ok(None); + } + + let marker = state::read_run_activity_marker_snapshot(worktree_mapping.worktree_path())? + .filter(|marker| { + marker.run_id() == run_attempt.run_id() + && marker.attempt_number() == run_attempt.attempt_number() + }); + + if let Some(marker) = marker.as_ref() + && marker.process_id().is_some() + { + if marker_process_is_alive(marker) { + return Ok(None); + } + + return Ok(Some( + marker + .last_activity_unix_epoch() + .and_then(|last_activity| observed_idle_duration(last_activity, now_unix_epoch)) + .unwrap_or(Duration::ZERO), + )); + } + + stalled_idle_duration( + state_store, + run_attempt, + Some(worktree_mapping), + now_unix_epoch, + ) +} + fn clear_terminal_lane_labels_once( tracker: &T, project: &ServiceConfig, diff --git a/apps/decodex/src/orchestrator/selection.rs b/apps/decodex/src/orchestrator/selection.rs index e80681ce..5d1272b8 100644 --- a/apps/decodex/src/orchestrator/selection.rs +++ b/apps/decodex/src/orchestrator/selection.rs @@ -207,6 +207,10 @@ fn terminal_failure_comment_details( partial_progress.worktree_path ), ) + } else if let Some(app_server_failure) = + error.downcast_ref::() + { + (app_server_failure.error_class(), app_server_failure.terminal_next_action(recovery_gate)) } else if error.downcast_ref::().is_some() { ( "stalled_run_detected", diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 4ae19f37..5f7bf9f9 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -3357,7 +3357,14 @@ fn worktree_activity_marker_is_fresh(marker: &RunActivityMarker, now_unix_epoch: && marker .last_activity_unix_epoch() .and_then(|last_activity| observed_idle_duration(last_activity, now_unix_epoch)) - .is_some_and(|idle_for| idle_for < ACTIVE_RUN_IDLE_TIMEOUT) + .is_some_and(|idle_for| idle_for < run_activity_idle_timeout(Some(marker))) +} + +fn run_activity_idle_timeout(marker: Option<&RunActivityMarker>) -> Duration { + agent::protocol_activity_idle_timeout( + marker.and_then(RunActivityMarker::protocol_activity), + ACTIVE_RUN_IDLE_TIMEOUT, + ) } fn marker_process_is_alive(marker: &RunActivityMarker) -> bool { @@ -3475,8 +3482,12 @@ fn operator_run_status( &phase, marker.as_ref().and_then(RunActivityMarker::current_operation), ); - let suspected_stall = - operator_run_is_suspected_stall(&phase, timing.last_progress_unix_epoch, now_unix_epoch); + let suspected_stall = operator_run_is_suspected_stall( + &phase, + timing.last_progress_unix_epoch, + now_unix_epoch, + run_activity_idle_timeout(marker.as_ref()), + ); let child_agent_activity = operator_run_child_agent_activity(marker.as_ref(), now_unix_epoch); let protocol_activity = operator_run_protocol_activity( marker.as_ref(), @@ -3502,9 +3513,7 @@ fn operator_run_status( append_primary_account_if_missing(&mut accounts, account.as_ref()); let branch_name = run.branch_name().map(str::to_owned); - let worktree_path = run - .worktree_path() - .map(|path| relative_worktree_path_for_path(project, path)); + let worktree_path = operator_run_relative_worktree_path(project, &run); let issue_identifier = operator_run_issue_identifier_from_fields( run.run_id(), branch_name.as_deref(), @@ -3570,6 +3579,14 @@ fn operator_run_status( }) } +fn operator_run_relative_worktree_path( + project: &ServiceConfig, + run: &ProjectRunStatus, +) -> Option { + run.worktree_path() + .map(|path| relative_worktree_path_for_path(project, path)) +} + fn operator_run_private_evidence( project: &ServiceConfig, run: &ProjectRunStatus, @@ -3965,6 +3982,7 @@ fn operator_run_is_suspected_stall( phase: &str, last_progress_unix_epoch: Option, now_unix_epoch: i64, + idle_timeout: Duration, ) -> bool { if phase != "executing" { return false; @@ -3973,13 +3991,13 @@ fn operator_run_is_suspected_stall( last_progress_unix_epoch .and_then(|last_progress| observed_idle_duration(last_progress, now_unix_epoch)) .is_some_and(|idle_for| { - idle_for >= suspected_operator_run_stall_threshold() - && idle_for < ACTIVE_RUN_IDLE_TIMEOUT + idle_for >= suspected_operator_run_stall_threshold(idle_timeout) + && idle_for < idle_timeout }) } -fn suspected_operator_run_stall_threshold() -> Duration { - Duration::from_secs((ACTIVE_RUN_IDLE_TIMEOUT.as_secs() / 2).max(1)) +fn suspected_operator_run_stall_threshold(idle_timeout: Duration) -> Duration { + Duration::from_secs((idle_timeout.as_secs() / 2).max(1)) } fn visible_operator_run_retry_schedule( diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index bd5380f0..54b54e83 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -22,7 +22,8 @@ use time::OffsetDateTime; use crate::tracker::records; #[rustfmt::skip] use crate::agent::{ - ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, + ACTIVE_RUN_IDLE_TIMEOUT, MODEL_EXECUTION_IDLE_TIMEOUT, + AppServerCapabilityPreflightFailure, AppServerHomePreflightFailure, AppServerTransportFailure, AppServerTurnFailure, DynamicToolHandler, ReviewPolicyStopReason, ReviewPolicyStopRequested, TrackerToolBridge, TurnContinuationGuard, diff --git a/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs b/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs index fe410df8..1cdf4087 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/reconciliation.rs @@ -660,6 +660,82 @@ fn active_run_reconciliation_uses_worktree_activity_marker_from_child_process() ); } +#[test] +fn active_run_reconciliation_allows_running_model_execution_until_model_timeout() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_issue("In Progress", &[]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let run_id = "run-model-execution-idle"; + let worktree_path = config.worktree_root().join("PUB-101-model"); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + + state_store + .record_run_attempt(run_id, &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease("pubfi", &issue.id, run_id, "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101-model", + &worktree_path.display().to_string(), + ) + .expect("worktree mapping should record"); + + let last_activity = state_store + .last_run_activity_unix_epoch(run_id) + .expect("last activity lookup should succeed") + .expect("run activity should exist"); + let protocol_activity = + r#"{"turn_status":"running","waiting_reason":"model_execution","rate_limit_status":null,"recent_events":[]}"#; + + fs::write( + worktree_path.join(RUN_ACTIVITY_MARKER_FILE), + format!( + "run_id={run_id}\nattempt_number=1\nlast_activity_unix_epoch={last_activity}\nlast_protocol_activity_unix_epoch={last_activity}\nlast_progress_unix_epoch={last_activity}\nprotocol_activity={protocol_activity}\n" + ), + ) + .expect("activity marker should write"); + + let actions = orchestrator::inspect_active_run_reconciliation_at( + &tracker, + &config, + &workflow, + &state_store, + None, + last_activity + ACTIVE_RUN_IDLE_TIMEOUT.as_secs() as i64 + 1, + ) + .expect("active run inspection should succeed"); + + assert!( + actions.is_empty(), + "running model execution should not stall on the generic active idle timeout" + ); + + let actions = orchestrator::inspect_active_run_reconciliation_at( + &tracker, + &config, + &workflow, + &state_store, + None, + last_activity + MODEL_EXECUTION_IDLE_TIMEOUT.as_secs() as i64 + 1, + ) + .expect("active run inspection should succeed"); + + assert!(actions.iter().any(|action| { + action.issue.id == issue.id + && matches!( + action.disposition, + ActiveRunDisposition::Stalled{ idle_for } + if idle_for >= MODEL_EXECUTION_IDLE_TIMEOUT + ) + })); +} + #[test] fn stalled_protocol_idle_duration_ignores_future_protocol_activity() { let state_store = StateStore::open_in_memory().expect("state store should open"); @@ -923,6 +999,129 @@ fn stalled_run_reconciliation_routes_to_needs_attention_without_cleanup() { })); } +#[test] +fn project_reconciliation_routes_orphaned_active_worktree_run_to_needs_attention() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = reconciliation_sample_service_owned_issue("In Progress"); + let tracker = FakeTracker::new(vec![issue.clone()]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new("pubfi", config.repo_root(), config.worktree_root()); + let run_id = "run-orphaned-active"; + let worktree_path = config.worktree_root().join(&issue.identifier); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + + state_store + .record_run_attempt(run_id, &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree mapping should record"); + + state::write_run_activity_marker_for_process(&worktree_path, run_id, 1, u32::MAX) + .expect("stopped process marker should write"); + orchestrator::reconcile_project_state( + &tracker, + &config, + &workflow, + &state_store, + &worktree_manager, + ) + .expect("project reconciliation should succeed"); + + assert!(state_store.lease_for_issue(&issue.id).expect("lease lookup should succeed").is_none()); + assert!( + state_store + .worktree_for_issue(&issue.id) + .expect("worktree lookup should succeed") + .is_some(), + "orphaned active worktree must stay available for operator recovery" + ); + assert_eq!( + state_store + .run_attempt(run_id) + .expect("run attempt lookup should succeed") + .expect("run attempt should exist") + .status(), + "stalled" + ); + assert_eq!( + tracker.label_additions.borrow().as_slice(), + [(issue.id.clone(), vec![String::from("label-needs-attention")])] + ); + assert_eq!( + tracker.label_removals.borrow().as_slice(), + [(issue.id.clone(), vec![String::from("label-active")])] + ); + assert!(tracker.comments.borrow().iter().any(|comment| { + comment.contains("stalled_run_detected") + && comment.contains("needs attention") + && comment.contains("run-orphaned-active") + })); +} + +#[test] +fn project_reconciliation_marks_orphaned_attention_worktree_run_stalled_without_tracker_writes() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_issue("Todo", &["decodex:needs-attention"]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new("pubfi", config.repo_root(), config.worktree_root()); + let run_id = "run-attention-orphan"; + let worktree_path = config.worktree_root().join(&issue.identifier); + + fs::create_dir_all(&worktree_path).expect("worktree path should exist"); + + state_store + .record_run_attempt(run_id, &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_worktree( + "pubfi", + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree mapping should record"); + + state::write_run_activity_marker_for_process(&worktree_path, run_id, 1, u32::MAX) + .expect("stopped process marker should write"); + orchestrator::reconcile_project_state( + &tracker, + &config, + &workflow, + &state_store, + &worktree_manager, + ) + .expect("project reconciliation should succeed"); + + assert_eq!( + state_store + .run_attempt(run_id) + .expect("run attempt lookup should succeed") + .expect("run attempt should exist") + .status(), + "stalled" + ); + assert!( + state_store + .worktree_for_issue(&issue.id) + .expect("worktree lookup should succeed") + .is_some(), + "attention worktree must stay available for operator recovery" + ); + assert!(tracker.label_additions.borrow().is_empty()); + assert!(tracker.label_removals.borrow().is_empty()); + assert!(tracker.comments.borrow().is_empty()); +} + #[test] fn stalled_run_reconciliation_preserves_retry_budget_marker_from_retained_worktree() { let (_temp_dir, config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/tests/recovery/terminal_failures.rs b/apps/decodex/src/orchestrator/tests/recovery/terminal_failures.rs index 107408dd..ea636844 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/terminal_failures.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/terminal_failures.rs @@ -1,3 +1,6 @@ +use orchestrator::ReviewHandoffNeedsAttention; +use orchestrator::PassiveRetainedAttentionRuntime; + #[test] fn terminal_failures_without_needs_attention_label_use_nonstartable_guard_state() { let (_temp_dir, config, workflow) = temp_project_layout(); @@ -158,6 +161,213 @@ fn terminal_failures_apply_incremental_label_mutations_when_issue_labels_paginat assert_eq!(ledger_event.terminal_path.as_deref(), Some("manual_attention")); } +#[test] +fn duplicate_terminal_failure_event_does_not_reapply_tracker_writeback() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let active_label = tracker::automation_active_label(TEST_SERVICE_ID); + let issue = sample_issue("In Review", &[active_label.as_str()]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue_run = IssueRunPlan { + issue: issue.clone(), + issue_state: issue.state.name.clone(), + initial_issue_state: issue.state.name.clone(), + worktree: WorktreeSpec { + branch_name: String::from("x/pubfi-pub-101"), + issue_identifier: issue.identifier.clone(), + path: config.worktree_root().join("PUB-101"), + reused_existing: true, + }, + retry_project_slug: issue.project_slug.clone().expect("sample issue should carry a project slug"), + dispatch_mode: IssueDispatchMode::ReviewRepair, + attempt_number: 2, + run_id: String::from("pub-101-attempt-2-123"), + retry_budget_base: 0, + }; + let error = Report::new(ReviewHandoffNeedsAttention { + issue_identifier: issue.identifier.clone(), + pr_url: String::from("https://github.com/helixbox/pubfi-mono-v2/pull/307"), + run_id: issue_run.run_id.clone(), + }); + + fs::create_dir_all(&issue_run.worktree.path).expect("worktree path should exist"); + + state_store + .record_run_attempt(&issue_run.run_id, &issue.id, issue_run.attempt_number, "failed") + .expect("run attempt should record"); + + orchestrator::handle_failure(&tracker, &config, &workflow, &state_store, &issue_run, &error) + .expect("first terminal failure writeback should apply"); + orchestrator::handle_failure(&tracker, &config, &workflow, &state_store, &issue_run, &error) + .expect("duplicate terminal failure writeback should no-op"); + + assert_eq!(tracker.state_updates.borrow().len(), 1); + assert_eq!(tracker.label_additions.borrow().len(), 1); + assert_eq!(tracker.label_removals.borrow().len(), 1); + assert_eq!( + tracker + .comments + .borrow() + .iter() + .filter(|comment| comment.contains("review_handoff_writeback_failed")) + .count(), + 1 + ); + assert_eq!( + state_store + .list_linear_execution_events(config.service_id(), &issue.id) + .expect("linear execution events should list") + .len(), + 1 + ); +} + +#[test] +fn duplicate_remote_terminal_failure_event_does_not_reapply_tracker_writeback() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let active_label = tracker::automation_active_label(TEST_SERVICE_ID); + let issue = sample_issue("In Review", &[active_label.as_str()]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let first_state_store = StateStore::open_in_memory().expect("state store should open"); + let second_state_store = StateStore::open_in_memory().expect("state store should open"); + let issue_run = IssueRunPlan { + issue: issue.clone(), + issue_state: issue.state.name.clone(), + initial_issue_state: issue.state.name.clone(), + worktree: WorktreeSpec { + branch_name: String::from("x/pubfi-pub-101"), + issue_identifier: issue.identifier.clone(), + path: config.worktree_root().join("PUB-101"), + reused_existing: true, + }, + retry_project_slug: issue.project_slug.clone().expect("sample issue should carry a project slug"), + dispatch_mode: IssueDispatchMode::ReviewRepair, + attempt_number: 2, + run_id: String::from("pub-101-attempt-2-123"), + retry_budget_base: 0, + }; + let error = Report::new(ReviewHandoffNeedsAttention { + issue_identifier: issue.identifier.clone(), + pr_url: String::from("https://github.com/helixbox/pubfi-mono-v2/pull/307"), + run_id: issue_run.run_id.clone(), + }); + + fs::create_dir_all(&issue_run.worktree.path).expect("worktree path should exist"); + orchestrator::handle_failure( + &tracker, + &config, + &workflow, + &first_state_store, + &issue_run, + &error, + ) + .expect("first terminal failure writeback should apply"); + orchestrator::handle_failure( + &tracker, + &config, + &workflow, + &second_state_store, + &issue_run, + &error, + ) + .expect("remote duplicate terminal failure writeback should no-op"); + + assert_eq!(tracker.state_updates.borrow().len(), 1); + assert_eq!(tracker.label_additions.borrow().len(), 1); + assert_eq!(tracker.label_removals.borrow().len(), 1); + assert_eq!( + tracker + .comments + .borrow() + .iter() + .filter(|comment| comment.contains("review_handoff_writeback_failed")) + .count(), + 1 + ); + assert_eq!( + second_state_store + .list_linear_execution_events(config.service_id(), &issue.id) + .expect("remote duplicate should be learned into local execution events") + .len(), + 1 + ); +} + +#[test] +fn duplicate_passive_retained_review_attention_event_does_not_reapply_tracker_writeback() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let active_label = tracker::automation_active_label(TEST_SERVICE_ID); + let issue = sample_issue("In Review", &[active_label.as_str()]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_manager = + WorktreeManager::new(config.service_id(), config.repo_root(), config.worktree_root()); + let worktree = worktree_manager + .ensure_worktree(&issue.identifier, false) + .expect("retained worktree should exist"); + let run_identity = RetainedReviewRunIdentity { + run_id: String::from("pub-101-attempt-8-123"), + attempt_number: 8, + }; + + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + &worktree.branch_name, + &worktree.path.display().to_string(), + ) + .expect("worktree mapping should record"); + + let worktree_mapping = state_store + .worktree_for_issue(&issue.id) + .expect("worktree mapping query should succeed") + .expect("worktree mapping should exist"); + let runtime = PassiveRetainedAttentionRuntime { + tracker: &tracker, + project: &config, + workflow: &workflow, + state_store: &state_store, + }; + + orchestrator::apply_passive_retained_manual_attention_with_run_identity( + runtime, + &issue, + &worktree_mapping, + &run_identity, + "missing_review_handoff_record", + ) + .expect("first passive retained attention writeback should apply"); + orchestrator::apply_passive_retained_manual_attention_with_run_identity( + runtime, + &issue, + &worktree_mapping, + &run_identity, + "missing_review_handoff_record", + ) + .expect("duplicate passive retained attention writeback should no-op"); + + assert_eq!(tracker.state_updates.borrow().len(), 1); + assert_eq!(tracker.label_additions.borrow().len(), 1); + assert_eq!(tracker.label_removals.borrow().len(), 1); + assert_eq!( + tracker + .comments + .borrow() + .iter() + .filter(|comment| comment.contains("missing_review_handoff_record")) + .count(), + 1 + ); + assert_eq!( + state_store + .list_linear_execution_events(config.service_id(), &issue.id) + .expect("linear execution events should list") + .len(), + 1 + ); +} + #[test] fn review_policy_exhausted_failures_skip_retry_and_require_attention_pre_pr() { let (_temp_dir, config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/tests/runtime/failure.rs b/apps/decodex/src/orchestrator/tests/runtime/failure.rs index d6c47519..2db99b7d 100644 --- a/apps/decodex/src/orchestrator/tests/runtime/failure.rs +++ b/apps/decodex/src/orchestrator/tests/runtime/failure.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use orchestrator::{ AgentGitCredentialEnvironment, AgentGitCredentialsUnavailable, RepoGateFailureKind, }; +use orchestrator::AppServerZeroEvidenceStartFailure; fn git_config_value( repo_root: &Path, @@ -494,6 +495,14 @@ fn app_server_terminal_failures_preserve_specific_error_classes() { "app_server_transport_disconnected", "inspect the local app-server stderr tail", ), + ( + Report::new(AppServerZeroEvidenceStartFailure::new( + String::from("PUB-101"), + String::from("pub-101-attempt-1-123"), + )), + "app_server_zero_evidence_start_failed", + "verify `decodex probe stdio://`", + ), ( Report::new(AppServerTurnFailure::new( "thread-1", @@ -549,6 +558,101 @@ fn app_server_preflight_terminal_action_surfaces_first_scan_error() { assert!(next_action.contains("first_error=name: exceeds maximum length of 64 characters")); } +#[test] +fn zero_evidence_app_server_start_failure_is_promoted_and_records_private_evidence() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let tracker = FakeTracker::new(vec![]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("In Progress", &[]); + let _env_guard = TestEnvVarGuard::set( + "DECODEX_TEST_ZERO_EVIDENCE_SECRET_TOKEN", + "synthetic-secret-token", + ); + let issue_run = IssueRunPlan { + issue: issue.clone(), + issue_state: issue.state.name.clone(), + initial_issue_state: String::from("Todo"), + worktree: WorktreeSpec { + branch_name: String::from("x/pubfi-pub-101"), + issue_identifier: issue.identifier.clone(), + path: config.worktree_root().join("PUB-101"), + reused_existing: false, + }, + retry_project_slug: issue + .project_slug + .clone() + .expect("sample issue should carry a project slug"), + dispatch_mode: IssueDispatchMode::Normal, + attempt_number: 1, + run_id: String::from("pub-101-attempt-1-123"), + retry_budget_base: 0, + }; + + fs::create_dir_all(&issue_run.worktree.path).expect("worktree path should exist"); + + state_store + .record_run_attempt(&issue_run.run_id, &issue.id, issue_run.attempt_number, "failed") + .expect("run attempt should record"); + + let error = orchestrator::promote_zero_evidence_app_server_start_failure( + &config, + &state_store, + &issue_run, + Report::msg("synthetic startup failure: synthetic-secret-token"), + ); + + assert!( + error.downcast_ref::().is_some(), + "generic no-evidence startup errors should become terminal app-server start failures" + ); + + let events = state_store + .list_private_execution_events( + config.service_id(), + &issue.id, + &issue_run.run_id, + issue_run.attempt_number, + ) + .expect("private evidence should list"); + + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_type(), "app_server_zero_evidence_start_failure"); + assert_eq!( + events[0].payload()["error_class"], + "app_server_zero_evidence_start_failed" + ); + assert_eq!(events[0].payload()["protocol_event_count"], 0); + assert_eq!(events[0].payload()["thread_recorded"], false); + assert_eq!( + events[0].payload()["source_error_summary"], + "synthetic startup failure: " + ); + assert_eq!( + events[0].payload()["source_error_chain"][0], + "synthetic startup failure: " + ); + assert!( + !events[0].payload().to_string().contains("synthetic-secret-token"), + "private diagnostic payload must redact known secret env values" + ); + + orchestrator::handle_failure(&tracker, &config, &workflow, &state_store, &issue_run, &error) + .expect("terminal failure handling should succeed"); + + assert!(tracker.comments.borrow().iter().any(|comment| { + comment.contains("app_server_zero_evidence_start_failed") + && comment.contains("verify `decodex probe stdio://`") + })); + assert!( + !tracker + .comments + .borrow() + .iter() + .any(|comment| comment.contains("retryable_execution_failure")), + "zero-evidence startup failure must not burn retry budget as a generic retry" + ); +} + #[test] fn repo_gate_runtime_failures_require_manual_attention_without_retry_budget_wait() { let error = Report::new(orchestrator::RepoGateFailure::new( diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 754ca50a..8cc3bc21 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -1377,6 +1377,7 @@ pub(crate) enum ActiveRunDisposition { Terminal, NonActive, Stalled { idle_for: Duration }, + StalledAlreadyNeedsAttention { idle_for: Duration }, } enum RetainedReviewLaneLoad { diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 7519b736..d3f13191 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -1,5 +1,5 @@ #[cfg(target_os = "macos")] -use std::mem::MaybeUninit; +use std::mem::{self, MaybeUninit}; use std::sync::atomic::AtomicU64; use libc::FD_CLOEXEC; @@ -434,6 +434,46 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn upsert_lease_and_remember_run_project(&mut self, lease: &IssueLease) -> Result<()> { + let transaction = self.connection.transaction()?; + + transaction.execute( + "INSERT OR REPLACE INTO leases (issue_id, project_id, run_id, issue_state) + VALUES (?1, ?2, ?3, ?4)", + params![lease.issue_id(), lease.project_id(), lease.run_id(), lease.issue_state()], + )?; + + update_run_attempt_project(&transaction, lease.project_id(), lease.issue_id(), Some(lease.run_id()))?; + + transaction.commit()?; + + Ok(()) + } + + fn upsert_worktree_and_remember_run_project( + &mut self, + mapping: &WorktreeMappingRecord, + ) -> Result<()> { + let transaction = self.connection.transaction()?; + + transaction.execute( + "INSERT OR REPLACE INTO worktrees (issue_id, project_id, branch_name, worktree_path) + VALUES (?1, ?2, ?3, ?4)", + params![ + &mapping.issue_id, + &mapping.project_id, + &mapping.branch_name, + mapping.worktree_path.to_string_lossy().as_ref(), + ], + )?; + + update_run_attempt_project(&transaction, &mapping.project_id, &mapping.issue_id, None)?; + + transaction.commit()?; + + Ok(()) + } + fn append_protocol_event(&self, run_id: &str, event: &ProtocolEventRecord) -> Result { let changed = self.connection.execute( "INSERT OR IGNORE INTO protocol_events ( @@ -519,15 +559,61 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } - fn delete_previous_issue_identity(&mut self, previous_issue_id: &str) -> Result<()> { + fn retarget_issue_identity( + &mut self, + previous_issue_id: &str, + canonical_issue_id: &str, + ) -> Result<()> { let transaction = self.connection.transaction()?; + transaction.execute( + "INSERT OR IGNORE INTO leases (issue_id, project_id, run_id, issue_state) + SELECT ?2, project_id, run_id, issue_state FROM leases WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; transaction.execute("DELETE FROM leases WHERE issue_id = ?1", params![previous_issue_id])?; + transaction.execute( + "INSERT OR IGNORE INTO worktrees (issue_id, project_id, branch_name, worktree_path) + SELECT ?2, project_id, branch_name, worktree_path FROM worktrees WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; transaction.execute("DELETE FROM worktrees WHERE issue_id = ?1", params![previous_issue_id])?; + transaction.execute( + "UPDATE run_attempts SET issue_id = ?2 WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; + transaction.execute( + "UPDATE private_execution_events SET issue_id = ?2 WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; + transaction.execute( + "INSERT OR IGNORE INTO review_handoffs ( + project_id, issue_id, branch_name, run_id, attempt_number, pr_url, + target_base_ref_name, pr_head_ref_name, pr_head_oid, updated_at, updated_at_unix + ) + SELECT project_id, ?2, branch_name, run_id, attempt_number, pr_url, + target_base_ref_name, pr_head_ref_name, pr_head_oid, updated_at, updated_at_unix + FROM review_handoffs WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; transaction.execute( "DELETE FROM review_handoffs WHERE issue_id = ?1", params![previous_issue_id], )?; + transaction.execute( + "INSERT OR IGNORE INTO review_orchestrations ( + project_id, issue_id, branch_name, run_id, attempt_number, pr_url, head_sha, + phase, request_comment_database_id, request_created_at_unix_epoch, + request_description_thumbs_up_count, request_retry_count, external_round_count, + auto_merge_enabled_at_unix_epoch, updated_at, updated_at_unix + ) + SELECT project_id, ?2, branch_name, run_id, attempt_number, pr_url, head_sha, + phase, request_comment_database_id, request_created_at_unix_epoch, + request_description_thumbs_up_count, request_retry_count, external_round_count, + auto_merge_enabled_at_unix_epoch, updated_at, updated_at_unix + FROM review_orchestrations WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; transaction.execute( "DELETE FROM review_orchestrations WHERE issue_id = ?1", params![previous_issue_id], @@ -1799,6 +1885,30 @@ fn persist_projects(transaction: &Transaction<'_>, state: &StateData) -> Result< Ok(()) } +fn update_run_attempt_project( + transaction: &Transaction<'_>, + project_id: &str, + issue_id: &str, + run_id: Option<&str>, +) -> Result<()> { + match run_id { + Some(run_id) => { + transaction.execute( + "UPDATE run_attempts SET project_id = ?1 WHERE issue_id = ?2 AND run_id = ?3", + params![project_id, issue_id, run_id], + )?; + }, + None => { + transaction.execute( + "UPDATE run_attempts SET project_id = ?1 WHERE issue_id = ?2", + params![project_id, issue_id], + )?; + }, + } + + Ok(()) +} + fn persist_leases(transaction: &Transaction<'_>, state: &StateData) -> Result<()> { for lease in state.leases.values() { transaction.execute( diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 1f1cb7ba..ed1e3f6c 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -1,5 +1,3 @@ -use std::mem; - use crate::workflow::WorkflowConcurrencyLimit; /// Shared dispatch-slot capacity for one project. @@ -161,7 +159,7 @@ impl StateStore { return Ok(()); } - let mut state = self.lock()?; + let mut state = self.lock_without_refresh()?; if let Some(mut lease) = state.leases.remove(previous_issue_id) { lease.issue_id = canonical_issue_id.to_owned(); @@ -207,9 +205,7 @@ impl StateStore { record.issue_id = canonical_issue_id.to_owned(); } - self.persist_runtime_state_locked(&state)?; - - self.delete_previous_issue_identity_locked(previous_issue_id) + self.retarget_issue_identity_locked(previous_issue_id, canonical_issue_id) } /// Create or replace the active lease for one issue. @@ -220,20 +216,18 @@ impl StateStore { run_id: &str, issue_state: &str, ) -> Result<()> { - let mut state = self.lock()?; + let lease = IssueLease { + project_id: project_id.to_owned(), + issue_id: issue_id.to_owned(), + run_id: run_id.to_owned(), + issue_state: issue_state.to_owned(), + }; + let mut state = self.lock_without_refresh()?; - state.leases.insert( - issue_id.to_owned(), - IssueLease { - project_id: project_id.to_owned(), - issue_id: issue_id.to_owned(), - run_id: run_id.to_owned(), - issue_state: issue_state.to_owned(), - }, - ); + state.leases.insert(issue_id.to_owned(), lease.clone()); state.remember_run_project(project_id, issue_id, Some(run_id)); - self.persist_runtime_state_locked(&state) + self.upsert_lease_and_remember_run_project_locked(&lease) } /// Try to acquire one issue claim plus one shared dispatch slot for one issue. @@ -1151,20 +1145,18 @@ impl StateStore { branch_name: &str, worktree_path: &str, ) -> Result<()> { - let mut state = self.lock()?; + let mapping = WorktreeMappingRecord { + project_id: project_id.to_owned(), + issue_id: issue_id.to_owned(), + branch_name: branch_name.to_owned(), + worktree_path: PathBuf::from(worktree_path), + }; + let mut state = self.lock_without_refresh()?; - state.worktrees.insert( - issue_id.to_owned(), - WorktreeMappingRecord { - project_id: project_id.to_owned(), - issue_id: issue_id.to_owned(), - branch_name: branch_name.to_owned(), - worktree_path: PathBuf::from(worktree_path), - }, - ); + state.worktrees.insert(issue_id.to_owned(), mapping.clone()); state.remember_run_project(project_id, issue_id, None); - self.persist_runtime_state_locked(&state) + self.upsert_worktree_and_remember_run_project_locked(&mapping) } /// Create or replace the retained review handoff marker for one issue lane. @@ -1398,6 +1390,31 @@ impl StateStore { sqlite.upsert_run_attempt(attempt) } + fn upsert_lease_and_remember_run_project_locked(&self, lease: &IssueLease) -> Result<()> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(()); + }; + let mut sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + sqlite.upsert_lease_and_remember_run_project(lease) + } + + fn upsert_worktree_and_remember_run_project_locked( + &self, + mapping: &WorktreeMappingRecord, + ) -> Result<()> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(()); + }; + let mut sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + sqlite.upsert_worktree_and_remember_run_project(mapping) + } + fn append_protocol_event_locked( &self, run_id: &str, @@ -1463,7 +1480,11 @@ impl StateStore { sqlite.delete_lease(issue_id) } - fn delete_previous_issue_identity_locked(&self, previous_issue_id: &str) -> Result<()> { + fn retarget_issue_identity_locked( + &self, + previous_issue_id: &str, + canonical_issue_id: &str, + ) -> Result<()> { let Some(sqlite) = self.sqlite.as_ref() else { return Ok(()); }; @@ -1471,7 +1492,7 @@ impl StateStore { .lock() .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; - sqlite.delete_previous_issue_identity(previous_issue_id) + sqlite.retarget_issue_identity(previous_issue_id, canonical_issue_id) } fn delete_worktree_and_review_markers_locked(&self, issue_id: &str) -> Result<()> { @@ -1526,21 +1547,22 @@ fn retarget_review_handoff_issue( previous_issue_id: &str, canonical_issue_id: &str, ) { - let existing = mem::take(records); - - for (key, mut record) in existing { - let next_issue_id = if key.issue_id == previous_issue_id { - canonical_issue_id - } else { - key.issue_id.as_str() + let previous_keys = records + .keys() + .filter(|key| key.issue_id == previous_issue_id) + .cloned() + .collect::>(); + + for key in previous_keys { + let Some(mut record) = records.remove(&key) else { + continue; }; - record.issue_id = next_issue_id.to_owned(); + record.issue_id = canonical_issue_id.to_owned(); - records.insert( - ReviewMarkerKey::new(&key.project_id, next_issue_id, &key.branch_name), - record, - ); + records + .entry(ReviewMarkerKey::new(&key.project_id, canonical_issue_id, &key.branch_name)) + .or_insert(record); } } @@ -1553,26 +1575,27 @@ fn retarget_review_orchestration_issue( previous_issue_id: &str, canonical_issue_id: &str, ) { - let existing = mem::take(records); - - for (key, mut record) in existing { - let next_issue_id = if key.issue_id == previous_issue_id { - canonical_issue_id - } else { - key.issue_id.as_str() + let previous_keys = records + .keys() + .filter(|key| key.issue_id == previous_issue_id) + .cloned() + .collect::>(); + + for key in previous_keys { + let Some(mut record) = records.remove(&key) else { + continue; }; - record.issue_id = next_issue_id.to_owned(); + record.issue_id = canonical_issue_id.to_owned(); - records.insert( - ReviewOrchestrationKey::new( + records + .entry(ReviewOrchestrationKey::new( &key.project_id, - next_issue_id, + canonical_issue_id, &key.branch_name, &key.run_id, key.attempt_number, - ), - record, - ); + )) + .or_insert(record); } } diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 89869786..a032500d 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -31,6 +31,35 @@ fn fd_has_close_on_exec(fd: i32) -> bool { flags & FD_CLOEXEC != 0 } +fn sample_pub_101_review_handoff() -> ReviewHandoffMarker { + ReviewHandoffMarker::new( + "run-1", + 1, + "x/decodex-pub-101", + "https://github.com/hack-ink/decodex/pull/101", + "main", + "x/decodex-pub-101", + "08a20f7dfb9526e7421a5f095b1c6adec84e52d6", + ) +} + +fn sample_pub_101_review_orchestration() -> ReviewOrchestrationMarker { + ReviewOrchestrationMarker::new( + "run-1", + 1, + "x/decodex-pub-101", + "https://github.com/hack-ink/decodex/pull/101", + "08a20f7dfb9526e7421a5f095b1c6adec84e52d6", + "request_pending", + None, + None, + None, + 0, + 0, + None, + ) +} + #[test] fn review_markers_roundtrip_preserve_required_fields() { let store = StateStore::open_in_memory().expect("state store should open"); @@ -1652,6 +1681,92 @@ fn persistent_clear_worktree_deletes_review_markers() { ); } +#[test] +fn canonicalize_issue_identity_retargets_persistent_rows_without_cache_refresh() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let stale_store = StateStore::open(&state_path).expect("stale state store should open"); + let writer = StateStore::open(&state_path).expect("writer state store should open"); + let handoff = sample_pub_101_review_handoff(); + let orchestration = sample_pub_101_review_orchestration(); + + writer + .record_run_attempt("run-1", "PUB-101", 1, "running") + .expect("run attempt should persist"); + writer + .upsert_worktree("pubfi", "PUB-101", "x/decodex-pub-101", "/tmp/worktrees/pub-101") + .expect("worktree mapping should persist"); + writer + .upsert_lease("pubfi", "PUB-101", "run-1", IN_PROGRESS_STATE) + .expect("lease should persist"); + writer + .append_private_execution_event( + "pubfi", + "PUB-101", + "run-1", + 1, + "progress_checkpoint", + serde_json::json!({ "summary": "cached on visible tracker key" }), + ) + .expect("private evidence should persist"); + writer + .upsert_review_handoff_marker("pubfi", "PUB-101", &handoff) + .expect("handoff marker should persist"); + writer + .upsert_review_orchestration_marker("pubfi", "PUB-101", &orchestration) + .expect("orchestration marker should persist"); + stale_store + .canonicalize_issue_identity("PUB-101", "linear-id-101") + .expect("identity should canonicalize from SQLite rows"); + + let reopened = StateStore::open(&state_path).expect("state store should reopen"); + let run = reopened + .run_attempt("run-1") + .expect("run attempt should read") + .expect("run attempt should exist"); + + assert_eq!(run.issue_id(), "linear-id-101"); + assert!(reopened.lease_for_issue("PUB-101").expect("old lease lookup should read").is_none()); + assert!( + reopened.worktree_for_issue("PUB-101").expect("old worktree lookup should read").is_none() + ); + assert_eq!( + reopened + .lease_for_issue("linear-id-101") + .expect("canonical lease lookup should read") + .expect("canonical lease should exist") + .run_id(), + "run-1" + ); + assert_eq!( + reopened + .worktree_for_issue("linear-id-101") + .expect("canonical worktree lookup should read") + .expect("canonical worktree should exist") + .branch_name(), + "x/decodex-pub-101" + ); + assert_eq!( + reopened + .list_private_execution_events("pubfi", "linear-id-101", "run-1", 1) + .expect("canonical private evidence should read") + .len(), + 1 + ); + assert_eq!( + reopened + .review_handoff_marker("pubfi", "linear-id-101", "x/decodex-pub-101") + .expect("canonical handoff should read"), + Some(handoff.clone()) + ); + assert_eq!( + reopened + .review_orchestration_marker("pubfi", "linear-id-101", &handoff) + .expect("canonical orchestration should read"), + Some(orchestration) + ); +} + #[test] fn lists_issue_leases() { let store = StateStore::open_in_memory().expect("in-memory state store should open"); diff --git a/docs/reference/test-suite.md b/docs/reference/test-suite.md index 5cb1abff..108b99f3 100644 --- a/docs/reference/test-suite.md +++ b/docs/reference/test-suite.md @@ -65,7 +65,7 @@ large catch-all test file unless the behavior crosses several of these stages. | `apps/decodex/src/orchestrator/tests/recovery/closeout/dispatch.rs` | 5 | Direct closeout dispatch and PR validation | | `apps/decodex/src/orchestrator/tests/recovery/closeout/identity.rs` | 4 | Closeout identity reuse after retained runs | | `apps/decodex/src/orchestrator/tests/recovery/closeout/cleanup.rs` | 6 | Retained closeout cleanup and cleanup blockers | -| `apps/decodex/src/orchestrator/tests/recovery/terminal_failures.rs` | 8 | Terminal failure labeling and nonretryable attention | +| `apps/decodex/src/orchestrator/tests/recovery/terminal_failures.rs` | 11 | Terminal failure labeling, nonretryable attention, and local/remote duplicate writeback idempotency | | `apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs` | 27 | Runtime reentry, recovered worktrees, liveness, and live-run recovery | | `apps/decodex/src/orchestrator/tests/operator/status_support.rs` | 0 | Shared operator status fixtures | | `apps/decodex/src/orchestrator/tests/operator/status/control_plane.rs` | 5 | Registered project control-plane rows | diff --git a/docs/spec/linear-execution-ledger.md b/docs/spec/linear-execution-ledger.md index a2407576..d40ea959 100644 --- a/docs/spec/linear-execution-ledger.md +++ b/docs/spec/linear-execution-ledger.md @@ -288,6 +288,11 @@ be replayed as the active state machine. `service_id`, `issue_id`, and `idempotency_key`. - If duplicates disagree, consumers should prefer the earliest valid record and surface a data-quality warning instead of guessing which record is authoritative. +- Producers that perform side effects for a `needs_attention` or `terminal_failure` + record must treat the idempotency key as guarding the whole writeback. A duplicate + terminal event in the local runtime store or the remote Linear comment ledger must + not reapply the tracker state transition, automation-label mutations, or public + comment. - Event ordering is by `event_timestamp`, with Linear comment creation time as a fallback tiebreaker. Consumers must tolerate delayed comments and duplicate retries. diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 5e7ac9c3..e1f7c449 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -348,6 +348,11 @@ Structured needs-attention and terminal-failure comments are Linear execution le records. Their required identity, error, next-action, blocker, evidence, terminal-path, and idempotency fields are defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). +The idempotency boundary covers the whole terminal writeback, not only the Linear +comment: once the same `needs_attention` or `terminal_failure` ledger event is already +recorded locally or present in the remote Linear comment ledger, reconciliation or +child-exit recovery must not reapply the failure state transition, automation-label +mutations, or duplicate comment for that logical event. ## Local operational state