diff --git a/apps/decodex/src/orchestrator/execution.rs b/apps/decodex/src/orchestrator/execution.rs index 2d05323c..87090fcc 100644 --- a/apps/decodex/src/orchestrator/execution.rs +++ b/apps/decodex/src/orchestrator/execution.rs @@ -65,6 +65,13 @@ where run_result: &'a AppServerRunResult, } +struct ThreadArchiveCandidate { + run_id: String, + attempt_number: i64, + thread_id: String, + sequence_number: i64, +} + fn execute_issue_run( tracker: &T, project: &ServiceConfig, @@ -564,7 +571,7 @@ where run.issue_run, run.tracker_tool_bridge, )?; - archive_completed_app_server_thread_best_effort( + archive_completed_issue_threads_best_effort( run.project, run.state_store, run.issue_run, @@ -576,22 +583,133 @@ where Ok(run_summary_from_issue_run(run.project.service_id(), run.issue_run)) } -fn archive_completed_app_server_thread_best_effort( +fn archive_completed_issue_threads_best_effort( project: &ServiceConfig, state_store: &StateStore, issue_run: &IssueRunPlan, process_env: &AppServerProcessEnv, transport: &str, run_result: &AppServerRunResult, +) { + let candidates = + match completed_issue_thread_archive_candidates(state_store, issue_run, run_result) { + Ok(candidates) => candidates, + Err(error) => { + tracing::warn!( + ?error, + project_id = project.service_id(), + issue_id = issue_run.issue.id, + issue = issue_run.issue.identifier, + run_id = issue_run.run_id, + attempt = issue_run.attempt_number, + thread_id = %run_result.thread_id, + "Failed to list completed issue threads for archive; archiving current thread only." + ); + + vec![ThreadArchiveCandidate { + run_id: issue_run.run_id.clone(), + attempt_number: issue_run.attempt_number, + thread_id: run_result.thread_id.clone(), + sequence_number: run_result.event_count.saturating_add(1), + }] + }, + }; + + for candidate in candidates { + archive_completed_issue_thread_best_effort( + project, + state_store, + issue_run, + process_env, + transport, + &candidate, + ); + } +} + +fn completed_issue_thread_archive_candidates( + state_store: &StateStore, + issue_run: &IssueRunPlan, + run_result: &AppServerRunResult, +) -> Result> { + let mut seen_thread_ids = HashSet::new(); + let mut candidates = Vec::new(); + + push_thread_archive_candidate( + state_store, + &mut candidates, + &mut seen_thread_ids, + &issue_run.run_id, + issue_run.attempt_number, + &run_result.thread_id, + )?; + + for attempt in state_store.list_run_attempts_for_issue(&issue_run.issue.id)? { + if attempt.run_id() == issue_run.run_id + || !completed_issue_archive_attempt_status(attempt.status()) + { + continue; + } + + if let Some(thread_id) = attempt.thread_id() { + push_thread_archive_candidate( + state_store, + &mut candidates, + &mut seen_thread_ids, + attempt.run_id(), + attempt.attempt_number(), + thread_id, + )?; + } + } + + Ok(candidates) +} + +fn completed_issue_archive_attempt_status(status: &str) -> bool { + matches!(status, "succeeded" | "failed" | "interrupted" | TERMINAL_GUARDED_RUN_STATUS) +} + +fn push_thread_archive_candidate( + state_store: &StateStore, + candidates: &mut Vec, + seen_thread_ids: &mut HashSet, + run_id: &str, + attempt_number: i64, + thread_id: &str, +) -> Result<()> { + if !seen_thread_ids.insert(thread_id.to_owned()) + || state_store.run_has_protocol_event(run_id, "thread/archive")? + { + return Ok(()); + } + + candidates.push(ThreadArchiveCandidate { + run_id: run_id.to_owned(), + attempt_number, + thread_id: thread_id.to_owned(), + sequence_number: state_store.event_count(run_id)?.saturating_add(1), + }); + + Ok(()) +} + +fn archive_completed_issue_thread_best_effort( + project: &ServiceConfig, + state_store: &StateStore, + issue_run: &IssueRunPlan, + process_env: &AppServerProcessEnv, + transport: &str, + candidate: &ThreadArchiveCandidate, ) { let archive_request = AppServerThreadArchiveRequest { - run_id: &issue_run.run_id, + run_id: &candidate.run_id, issue_id: &issue_run.issue.id, - attempt_number: issue_run.attempt_number, + attempt_number: candidate.attempt_number, listen: transport, process_env, - thread_id: &run_result.thread_id, - sequence_number: run_result.event_count.saturating_add(1), + thread_id: &candidate.thread_id, + sequence_number: candidate.sequence_number, }; match agent::archive_app_server_thread_after_success(&archive_request, state_store) { @@ -599,20 +717,20 @@ fn archive_completed_app_server_thread_best_effort( project_id = project.service_id(), issue_id = issue_run.issue.id, issue = issue_run.issue.identifier, - run_id = issue_run.run_id, - attempt = issue_run.attempt_number, - thread_id = %run_result.thread_id, - "Archived completed app-server thread." + run_id = candidate.run_id, + attempt = candidate.attempt_number, + thread_id = %candidate.thread_id, + "Archived completed issue app-server thread." ), Err(error) => tracing::warn!( ?error, project_id = project.service_id(), issue_id = issue_run.issue.id, issue = issue_run.issue.identifier, - run_id = issue_run.run_id, - attempt = issue_run.attempt_number, - thread_id = %run_result.thread_id, - "Failed to archive completed app-server thread; leaving completed run intact." + run_id = candidate.run_id, + attempt = candidate.attempt_number, + thread_id = %candidate.thread_id, + "Failed to archive completed issue app-server thread; leaving completed run intact." ), } } diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index 20189322..bd5380f0 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -71,6 +71,8 @@ include!("tests/runtime/repo_gate.rs"); include!("tests/runtime/failure.rs"); +include!("tests/runtime/thread_archive.rs"); + include!("tests/recovery/reconciliation.rs"); include!("tests/recovery/terminal_support.rs"); diff --git a/apps/decodex/src/orchestrator/tests/runtime/thread_archive.rs b/apps/decodex/src/orchestrator/tests/runtime/thread_archive.rs new file mode 100644 index 00000000..77cd39e8 --- /dev/null +++ b/apps/decodex/src/orchestrator/tests/runtime/thread_archive.rs @@ -0,0 +1,75 @@ +use crate::agent::AppServerRunResult; + +#[test] +fn completed_issue_thread_archive_candidates_include_prior_terminal_attempts() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("In Progress", &[]); + + state_store + .record_run_attempt("run-old", &issue.id, 1, "failed") + .expect("old attempt should record"); + state_store.update_run_thread("run-old", "thread-old").expect("old thread should attach"); + state_store + .append_event("run-old", 1, "turn/completed", "{}") + .expect("old event should record"); + state_store + .record_run_attempt("run-current", &issue.id, 2, "succeeded") + .expect("current attempt should record"); + state_store + .update_run_thread("run-current", "thread-current") + .expect("current thread should attach"); + state_store + .append_event("run-current", 1, "turn/completed", "{}") + .expect("current event should record"); + state_store + .record_run_attempt("run-active", &issue.id, 3, "running") + .expect("active attempt should record"); + state_store.update_run_thread("run-active", "thread-active").expect("active thread should attach"); + state_store + .record_run_attempt("run-archived", &issue.id, 4, TERMINAL_GUARDED_RUN_STATUS) + .expect("archived attempt should record"); + state_store + .update_run_thread("run-archived", "thread-archived") + .expect("archived thread should attach"); + state_store + .append_event("run-archived", 1, "thread/archive", "{}") + .expect("archive event should record"); + + let issue_run = IssueRunPlan { + issue, + issue_state: String::from("In Progress"), + initial_issue_state: String::from("Todo"), + worktree: WorktreeSpec { + branch_name: String::from("xy/test"), + issue_identifier: String::from("PUB-101"), + path: temp_dir.path().join("PUB-101"), + reused_existing: false, + }, + retry_project_slug: String::from("pubfi"), + dispatch_mode: IssueDispatchMode::Normal, + attempt_number: 2, + run_id: String::from("run-current"), + retry_budget_base: 0, + }; + let run_result = AppServerRunResult { + user_agent: String::from("codex-test"), + thread_id: String::from("thread-current"), + turn_id: String::from("turn-current"), + turn_count: 1, + event_count: 1, + final_output: String::new(), + continuation_pending: false, + }; + let candidates = + super::completed_issue_thread_archive_candidates(&state_store, &issue_run, &run_result) + .expect("archive candidates should load"); + let candidate_threads = candidates + .iter() + .map(|candidate| candidate.thread_id.as_str()) + .collect::>(); + + assert_eq!(candidate_threads, vec!["thread-current", "thread-old"]); + assert_eq!(candidates[0].sequence_number, 2); + assert_eq!(candidates[1].sequence_number, 2); +} diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 94294161..1f1cb7ba 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -826,6 +826,34 @@ impl StateStore { Ok(attempt) } + /// List all locally recorded run attempts for one issue. + pub fn list_run_attempts_for_issue(&self, issue_id: &str) -> Result> { + let state = self.lock()?; + let mut attempts = state + .run_attempts + .values() + .filter(|attempt| attempt.issue_id == issue_id) + .map(RunAttemptRecord::as_public) + .collect::>(); + + attempts.sort_by(|left, right| { + left.attempt_number() + .cmp(&right.attempt_number()) + .then_with(|| left.run_id().cmp(right.run_id())) + }); + + Ok(attempts) + } + + /// Return whether one run already has a matching protocol event. + pub fn run_has_protocol_event(&self, run_id: &str, event_type: &str) -> Result { + let state = self.lock()?; + + Ok(state.events.get(run_id).is_some_and(|events| { + events.iter().any(|event| event.event_type == event_type) + })) + } + /// List recent run attempts for one project, including lease and protocol summary fields. pub fn list_recent_runs( &self, diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index bd2ebeef..89869786 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -1026,6 +1026,38 @@ fn records_run_attempts_and_events() { ); } +#[test] +fn lists_issue_attempts_and_protocol_event_presence() { + let store = StateStore::open_in_memory().expect("in-memory state store should open"); + + store + .record_run_attempt("run-2", "PUB-101", 2, "succeeded") + .expect("second run attempt should record"); + store + .record_run_attempt("run-1", "PUB-101", 1, "failed") + .expect("first run attempt should record"); + store + .record_run_attempt("run-other", "PUB-102", 1, "succeeded") + .expect("other issue run attempt should record"); + store.update_run_thread("run-1", "thread-1").expect("first thread should attach"); + store.update_run_thread("run-2", "thread-2").expect("second thread should attach"); + store.append_event("run-1", 1, "thread/archive", "{}").expect("archive event should record"); + + let attempts = + store.list_run_attempts_for_issue("PUB-101").expect("issue attempts should load"); + + assert_eq!(attempts.len(), 2); + assert_eq!(attempts[0].run_id(), "run-1"); + assert_eq!(attempts[0].thread_id(), Some("thread-1")); + assert_eq!(attempts[1].run_id(), "run-2"); + assert!(store.run_has_protocol_event("run-1", "thread/archive").expect("event should load")); + assert!( + !store + .run_has_protocol_event("run-2", "thread/archive") + .expect("missing event should load") + ); +} + #[test] fn run_activity_marker_round_trips_marker_surfaces() { assert_run_activity_marker_round_trips_clearable_auxiliary_fields(); diff --git a/docs/spec/app-server.md b/docs/spec/app-server.md index 934420d0..cea3d063 100644 --- a/docs/spec/app-server.md +++ b/docs/spec/app-server.md @@ -65,6 +65,9 @@ codex app-server generate-json-schema --experimental --out target/decodex-app-se - `thread/start` - `thread/resume` when retrying a persisted same-thread continuation - `turn/start` + - `thread/archive` after successful completion writeback, for every locally + recorded terminal attempt thread on the issue that has not already recorded a + successful archive event - Required notifications for the MVP: - `thread/started` - `thread/status/changed` @@ -98,6 +101,9 @@ unless an existing lifecycle event summarizes them. 7. Consume notifications until that turn reaches a terminal outcome. 8. If the project-owned continuation policy allows another same-thread turn, send another `turn/start` on the same thread. 9. Persist the local run journal and classify the bounded run result. +10. After successful completion writeback, best-effort archive all locally recorded + terminal attempt threads for the issue so prior failed retry attempts do not keep + the Codex conversation list visible. The capability preflight is observational. It may inspect the effective app-server config, model inventory, provider capabilities, skill inventory, plugin inventory, diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index bf5ce569..5e7ac9c3 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -220,6 +220,7 @@ If the label is recorded without the required explanatory comment, `decodex` mus If the resolved terminal path is not explicitly finalized through `issue_terminal_finalize`, the app-server wrapper must fail the turn before `decodex` records the attempt as successful. The explanatory public summary for `manual_attention` must describe the exact observed blocker and should include the failed command plus raw error text only when those values are public-safe, instead of speculating about unverified capability limits. Execution-state checkpoints are durable progress overlays only. Their phase, focus, next action, blockers, evidence, or verification fields are never a substitute for the explicit terminal-finalization call. +After successful completion writeback, Decodex must best-effort archive every locally recorded terminal Codex thread for the issue, including earlier failed retry attempts, so old attempts do not keep the issue visible in the Codex conversation list. ### Progress checkpoint writeback