Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 132 additions & 14 deletions apps/decodex/src/orchestrator/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ where
run_result: &'a AppServerRunResult,
}

struct ThreadArchiveCandidate {
run_id: String,
attempt_number: i64,
thread_id: String,
sequence_number: i64,
}

fn execute_issue_run<T>(
tracker: &T,
project: &ServiceConfig,
Expand Down Expand Up @@ -564,7 +571,7 @@ where
run.issue_run,
run.tracker_tool_bridge,
)?;
archive_completed_app_server_thread_best_effort(
archive_completed_issue_threads_best_effort(
run.project,
run.state_store,
run.issue_run,
Expand All @@ -576,43 +583,154 @@ where
Ok(run_summary_from_issue_run(run.project.service_id(), run.issue_run))
}

fn archive_completed_app_server_thread_best_effort(
fn archive_completed_issue_threads_best_effort(
project: &ServiceConfig,
state_store: &StateStore,
issue_run: &IssueRunPlan,
process_env: &AppServerProcessEnv,
transport: &str,
run_result: &AppServerRunResult,
) {
let candidates =
match completed_issue_thread_archive_candidates(state_store, issue_run, run_result) {
Ok(candidates) => candidates,
Err(error) => {
tracing::warn!(
?error,
project_id = project.service_id(),
issue_id = issue_run.issue.id,
issue = issue_run.issue.identifier,
run_id = issue_run.run_id,
attempt = issue_run.attempt_number,
thread_id = %run_result.thread_id,
"Failed to list completed issue threads for archive; archiving current thread only."
);

vec![ThreadArchiveCandidate {
run_id: issue_run.run_id.clone(),
attempt_number: issue_run.attempt_number,
thread_id: run_result.thread_id.clone(),
sequence_number: run_result.event_count.saturating_add(1),
}]
},
};

for candidate in candidates {
archive_completed_issue_thread_best_effort(
project,
state_store,
issue_run,
process_env,
transport,
&candidate,
);
}
}

fn completed_issue_thread_archive_candidates(
state_store: &StateStore,
issue_run: &IssueRunPlan,
run_result: &AppServerRunResult,
) -> Result<Vec<ThreadArchiveCandidate>> {
let mut seen_thread_ids = HashSet::new();
let mut candidates = Vec::new();

push_thread_archive_candidate(
state_store,
&mut candidates,
&mut seen_thread_ids,
&issue_run.run_id,
issue_run.attempt_number,
&run_result.thread_id,
)?;

for attempt in state_store.list_run_attempts_for_issue(&issue_run.issue.id)? {
if attempt.run_id() == issue_run.run_id
|| !completed_issue_archive_attempt_status(attempt.status())
{
continue;
}

if let Some(thread_id) = attempt.thread_id() {
push_thread_archive_candidate(
state_store,
&mut candidates,
&mut seen_thread_ids,
attempt.run_id(),
attempt.attempt_number(),
thread_id,
)?;
}
}

Ok(candidates)
}

fn completed_issue_archive_attempt_status(status: &str) -> bool {
matches!(status, "succeeded" | "failed" | "interrupted" | TERMINAL_GUARDED_RUN_STATUS)
}

fn push_thread_archive_candidate(
state_store: &StateStore,
candidates: &mut Vec<ThreadArchiveCandidate>,
seen_thread_ids: &mut HashSet<String>,
run_id: &str,
attempt_number: i64,
thread_id: &str,
) -> Result<()> {
if !seen_thread_ids.insert(thread_id.to_owned())
|| state_store.run_has_protocol_event(run_id, "thread/archive")?
{
return Ok(());
}

candidates.push(ThreadArchiveCandidate {
run_id: run_id.to_owned(),
attempt_number,
thread_id: thread_id.to_owned(),
sequence_number: state_store.event_count(run_id)?.saturating_add(1),
});

Ok(())
}

fn archive_completed_issue_thread_best_effort(
project: &ServiceConfig,
state_store: &StateStore,
issue_run: &IssueRunPlan,
process_env: &AppServerProcessEnv,
transport: &str,
candidate: &ThreadArchiveCandidate,
) {
let archive_request = AppServerThreadArchiveRequest {
run_id: &issue_run.run_id,
run_id: &candidate.run_id,
issue_id: &issue_run.issue.id,
attempt_number: issue_run.attempt_number,
attempt_number: candidate.attempt_number,
listen: transport,
process_env,
thread_id: &run_result.thread_id,
sequence_number: run_result.event_count.saturating_add(1),
thread_id: &candidate.thread_id,
sequence_number: candidate.sequence_number,
};

match agent::archive_app_server_thread_after_success(&archive_request, state_store) {
Ok(()) => tracing::info!(
project_id = project.service_id(),
issue_id = issue_run.issue.id,
issue = issue_run.issue.identifier,
run_id = issue_run.run_id,
attempt = issue_run.attempt_number,
thread_id = %run_result.thread_id,
"Archived completed app-server thread."
run_id = candidate.run_id,
attempt = candidate.attempt_number,
thread_id = %candidate.thread_id,
"Archived completed issue app-server thread."
),
Err(error) => tracing::warn!(
?error,
project_id = project.service_id(),
issue_id = issue_run.issue.id,
issue = issue_run.issue.identifier,
run_id = issue_run.run_id,
attempt = issue_run.attempt_number,
thread_id = %run_result.thread_id,
"Failed to archive completed app-server thread; leaving completed run intact."
run_id = candidate.run_id,
attempt = candidate.attempt_number,
thread_id = %candidate.thread_id,
"Failed to archive completed issue app-server thread; leaving completed run intact."
),
}
}
Expand Down
2 changes: 2 additions & 0 deletions apps/decodex/src/orchestrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ include!("tests/runtime/repo_gate.rs");

include!("tests/runtime/failure.rs");

include!("tests/runtime/thread_archive.rs");

include!("tests/recovery/reconciliation.rs");

include!("tests/recovery/terminal_support.rs");
Expand Down
75 changes: 75 additions & 0 deletions apps/decodex/src/orchestrator/tests/runtime/thread_archive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use crate::agent::AppServerRunResult;

#[test]
fn completed_issue_thread_archive_candidates_include_prior_terminal_attempts() {
let temp_dir = TempDir::new().expect("tempdir should create");
let state_store = StateStore::open_in_memory().expect("state store should open");
let issue = sample_issue("In Progress", &[]);

state_store
.record_run_attempt("run-old", &issue.id, 1, "failed")
.expect("old attempt should record");
state_store.update_run_thread("run-old", "thread-old").expect("old thread should attach");
state_store
.append_event("run-old", 1, "turn/completed", "{}")
.expect("old event should record");
state_store
.record_run_attempt("run-current", &issue.id, 2, "succeeded")
.expect("current attempt should record");
state_store
.update_run_thread("run-current", "thread-current")
.expect("current thread should attach");
state_store
.append_event("run-current", 1, "turn/completed", "{}")
.expect("current event should record");
state_store
.record_run_attempt("run-active", &issue.id, 3, "running")
.expect("active attempt should record");
state_store.update_run_thread("run-active", "thread-active").expect("active thread should attach");
state_store
.record_run_attempt("run-archived", &issue.id, 4, TERMINAL_GUARDED_RUN_STATUS)
.expect("archived attempt should record");
state_store
.update_run_thread("run-archived", "thread-archived")
.expect("archived thread should attach");
state_store
.append_event("run-archived", 1, "thread/archive", "{}")
.expect("archive event should record");

let issue_run = IssueRunPlan {
issue,
issue_state: String::from("In Progress"),
initial_issue_state: String::from("Todo"),
worktree: WorktreeSpec {
branch_name: String::from("xy/test"),
issue_identifier: String::from("PUB-101"),
path: temp_dir.path().join("PUB-101"),
reused_existing: false,
},
retry_project_slug: String::from("pubfi"),
dispatch_mode: IssueDispatchMode::Normal,
attempt_number: 2,
run_id: String::from("run-current"),
retry_budget_base: 0,
};
let run_result = AppServerRunResult {
user_agent: String::from("codex-test"),
thread_id: String::from("thread-current"),
turn_id: String::from("turn-current"),
turn_count: 1,
event_count: 1,
final_output: String::new(),
continuation_pending: false,
};
let candidates =
super::completed_issue_thread_archive_candidates(&state_store, &issue_run, &run_result)
.expect("archive candidates should load");
let candidate_threads = candidates
.iter()
.map(|candidate| candidate.thread_id.as_str())
.collect::<Vec<_>>();

assert_eq!(candidate_threads, vec!["thread-current", "thread-old"]);
assert_eq!(candidates[0].sequence_number, 2);
assert_eq!(candidates[1].sequence_number, 2);
}
28 changes: 28 additions & 0 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,34 @@ impl StateStore {
Ok(attempt)
}

/// List all locally recorded run attempts for one issue.
pub fn list_run_attempts_for_issue(&self, issue_id: &str) -> Result<Vec<RunAttempt>> {
let state = self.lock()?;
let mut attempts = state
.run_attempts
.values()
.filter(|attempt| attempt.issue_id == issue_id)
.map(RunAttemptRecord::as_public)
.collect::<Vec<_>>();

attempts.sort_by(|left, right| {
left.attempt_number()
.cmp(&right.attempt_number())
.then_with(|| left.run_id().cmp(right.run_id()))
});

Ok(attempts)
}

/// Return whether one run already has a matching protocol event.
pub fn run_has_protocol_event(&self, run_id: &str, event_type: &str) -> Result<bool> {
let state = self.lock()?;

Ok(state.events.get(run_id).is_some_and(|events| {
events.iter().any(|event| event.event_type == event_type)
}))
}

/// List recent run attempts for one project, including lease and protocol summary fields.
pub fn list_recent_runs(
&self,
Expand Down
32 changes: 32 additions & 0 deletions apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,38 @@ fn records_run_attempts_and_events() {
);
}

#[test]
fn lists_issue_attempts_and_protocol_event_presence() {
let store = StateStore::open_in_memory().expect("in-memory state store should open");

store
.record_run_attempt("run-2", "PUB-101", 2, "succeeded")
.expect("second run attempt should record");
store
.record_run_attempt("run-1", "PUB-101", 1, "failed")
.expect("first run attempt should record");
store
.record_run_attempt("run-other", "PUB-102", 1, "succeeded")
.expect("other issue run attempt should record");
store.update_run_thread("run-1", "thread-1").expect("first thread should attach");
store.update_run_thread("run-2", "thread-2").expect("second thread should attach");
store.append_event("run-1", 1, "thread/archive", "{}").expect("archive event should record");

let attempts =
store.list_run_attempts_for_issue("PUB-101").expect("issue attempts should load");

assert_eq!(attempts.len(), 2);
assert_eq!(attempts[0].run_id(), "run-1");
assert_eq!(attempts[0].thread_id(), Some("thread-1"));
assert_eq!(attempts[1].run_id(), "run-2");
assert!(store.run_has_protocol_event("run-1", "thread/archive").expect("event should load"));
assert!(
!store
.run_has_protocol_event("run-2", "thread/archive")
.expect("missing event should load")
);
}

#[test]
fn run_activity_marker_round_trips_marker_surfaces() {
assert_run_activity_marker_round_trips_clearable_auxiliary_fields();
Expand Down
6 changes: 6 additions & 0 deletions docs/spec/app-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ codex app-server generate-json-schema --experimental --out target/decodex-app-se
- `thread/start`
- `thread/resume` when retrying a persisted same-thread continuation
- `turn/start`
- `thread/archive` after successful completion writeback, for every locally
recorded terminal attempt thread on the issue that has not already recorded a
successful archive event
- Required notifications for the MVP:
- `thread/started`
- `thread/status/changed`
Expand Down Expand Up @@ -98,6 +101,9 @@ unless an existing lifecycle event summarizes them.
7. Consume notifications until that turn reaches a terminal outcome.
8. If the project-owned continuation policy allows another same-thread turn, send another `turn/start` on the same thread.
9. Persist the local run journal and classify the bounded run result.
10. After successful completion writeback, best-effort archive all locally recorded
terminal attempt threads for the issue so prior failed retry attempts do not keep
the Codex conversation list visible.

The capability preflight is observational. It may inspect the effective app-server
config, model inventory, provider capabilities, skill inventory, plugin inventory,
Expand Down
1 change: 1 addition & 0 deletions docs/spec/runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ If the label is recorded without the required explanatory comment, `decodex` mus
If the resolved terminal path is not explicitly finalized through `issue_terminal_finalize`, the app-server wrapper must fail the turn before `decodex` records the attempt as successful.
The explanatory public summary for `manual_attention` must describe the exact observed blocker and should include the failed command plus raw error text only when those values are public-safe, instead of speculating about unverified capability limits.
Execution-state checkpoints are durable progress overlays only. Their phase, focus, next action, blockers, evidence, or verification fields are never a substitute for the explicit terminal-finalization call.
After successful completion writeback, Decodex must best-effort archive every locally recorded terminal Codex thread for the issue, including earlier failed retry attempts, so old attempts do not keep the issue visible in the Codex conversation list.

### Progress checkpoint writeback

Expand Down