diff --git a/apps/decodex/src/agent.rs b/apps/decodex/src/agent.rs index d10a473c..1193561e 100644 --- a/apps/decodex/src/agent.rs +++ b/apps/decodex/src/agent.rs @@ -8,7 +8,8 @@ mod tracker_tool_bridge; pub(crate) use self::{ app_server::{ ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure, - AppServerRunRequest, AppServerRunResult, AppServerTurnFailure, TurnContinuationGuard, + AppServerRunRequest, AppServerRunResult, AppServerThreadArchiveRequest, + AppServerTurnFailure, TurnContinuationGuard, archive_app_server_thread_after_success, execute_app_server_run, probe_app_server, }, codex_accounts::{CodexAccountPool, CodexAccountProvider}, diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index f2eabed6..acd11967 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -28,9 +28,9 @@ use self::protocol::{ ModelListParams, ModelListResponse, ModelProviderCapabilitiesReadResponse, ModelSummary, PermissionGrantScope, PermissionsRequestApprovalResponse, PluginListParams, PluginListResponse, ProbeDynamicToolHandler, RunOutcome, RuntimeConfigSummary, SkillsListParams, - SkillsListResponse, ThreadResumeRequest, ThreadSessionResponse, ThreadStartRequest, - ThreadStatusChangedNotification, ToolRequestUserInputResponse, TurnCompletedNotification, - TurnError, TurnStartRequest, UserInput, + SkillsListResponse, ThreadArchiveRequest, ThreadResumeRequest, ThreadSessionResponse, + ThreadStartRequest, ThreadStatusChangedNotification, ToolRequestUserInputResponse, + TurnCompletedNotification, TurnError, TurnStartRequest, UserInput, }; use crate::{ agent::{ @@ -316,6 +316,16 @@ pub(crate) struct AppServerRunRequest<'a> { pub(crate) codex_account_provider: Option<&'a dyn CodexAccountProvider>, } +pub(crate) struct AppServerThreadArchiveRequest<'a> { + pub(crate) run_id: &'a str, + pub(crate) issue_id: &'a str, + pub(crate) attempt_number: i64, + pub(crate) listen: &'a str, + pub(crate) process_env: &'a AppServerProcessEnv, + pub(crate) thread_id: &'a str, + pub(crate) sequence_number: i64, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct CommandExecHealthCheck { pub(crate) command: Vec, @@ -908,6 +918,17 @@ pub(crate) fn execute_app_server_run( result } +pub(crate) fn archive_app_server_thread_after_success( + request: &AppServerThreadArchiveRequest<'_>, + state_store: &StateStore, +) -> crate::prelude::Result<()> { + let result = archive_app_server_thread_after_success_inner(request); + + record_thread_archive_result_best_effort(state_store, request, result.as_ref().err()); + + result +} + pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result { let state_store = StateStore::open_in_memory()?; let probe_tool_handler = ProbeDynamicToolHandler; @@ -945,6 +966,64 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result, +) -> crate::prelude::Result<()> { + let expected_codex_home = request.process_env.resolve_codex_home_env()?; + let mut client = AppServerClient::spawn(request.listen, request.process_env)?; + let initialize_response = client.initialize(false)?; + + validate_initialize_codex_home(&expected_codex_home, &initialize_response)?; + + client.mark_initialized()?; + client.archive_thread(ThreadArchiveRequest { thread_id: request.thread_id.to_owned() })?; + + Ok(()) +} + +fn record_thread_archive_result_best_effort( + state_store: &StateStore, + request: &AppServerThreadArchiveRequest<'_>, + error: Option<&Report>, +) { + let (event_type, payload) = match error { + Some(error) => ( + "thread/archive/failed", + serde_json::json!({ + "threadId": request.thread_id, + "issueId": request.issue_id, + "attemptNumber": request.attempt_number, + "error": error.to_string(), + }), + ), + None => ( + "thread/archive", + serde_json::json!({ + "threadId": request.thread_id, + "issueId": request.issue_id, + "attemptNumber": request.attempt_number, + }), + ), + }; + + if let Err(record_error) = state_store.append_event( + request.run_id, + request.sequence_number, + event_type, + &payload.to_string(), + ) { + tracing::warn!( + ?record_error, + run_id = request.run_id, + issue_id = request.issue_id, + attempt = request.attempt_number, + thread_id = request.thread_id, + event_type, + "Failed to record app-server thread archive event." + ); + } +} + fn classify_child_activity_event( event_type: &str, payload: &str, diff --git a/apps/decodex/src/agent/app_server/protocol.rs b/apps/decodex/src/agent/app_server/protocol.rs index 248cba84..c4f95f00 100644 --- a/apps/decodex/src/agent/app_server/protocol.rs +++ b/apps/decodex/src/agent/app_server/protocol.rs @@ -130,6 +130,13 @@ impl AppServerClient { self.connection.request_with_handler("thread/resume", ¶ms, REQUEST_TIMEOUT, handler) } + pub(super) fn archive_thread( + &mut self, + params: ThreadArchiveRequest, + ) -> Result { + self.connection.request("thread/archive", ¶ms, REQUEST_TIMEOUT) + } + #[allow(dead_code)] pub(super) fn start_turn(&mut self, params: TurnStartRequest) -> Result { self.start_turn_with_handler(params, |_connection, _message, request| { @@ -319,6 +326,15 @@ pub(super) struct ThreadResumeRequest { pub(super) developer_instructions: Option, } +#[derive(Debug, Serialize)] +pub(super) struct ThreadArchiveRequest { + #[serde(rename = "threadId")] + pub(super) thread_id: String, +} + +#[derive(Debug, Deserialize)] +pub(super) struct ThreadArchiveResponse {} + #[derive(Clone, Debug, Deserialize)] pub(super) struct ThreadSessionResponse { pub(super) thread: Thread, diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index 4d87b9e8..ac43ec35 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -1,6 +1,8 @@ use std::{ cell::RefCell, collections::BTreeMap, + env, fs, + os::unix::fs::PermissionsExt, path::PathBuf, time::{Duration, Instant}, }; @@ -13,11 +15,11 @@ use crate::{ agent::{ app_server::{ AppServerCapabilityPreflightFailure, AppServerCapabilityPreflightReport, - AppServerDynamicToolFailure, AppServerRunResult, AppServerTurnFailure, - CommandExecHealthCheck, CommandExecResponse, EffectiveThreadConfig, InitializeResponse, - ModelProviderCapabilitiesReadResponse, PluginListResponse, ProbeDynamicToolHandler, - RequestWaitPhase, RunRecorder, RuntimeConfigSummary, SkillsListResponse, - TurnContinuationGuard, UserInput, + AppServerDynamicToolFailure, AppServerRunResult, AppServerThreadArchiveRequest, + AppServerTurnFailure, CommandExecHealthCheck, CommandExecResponse, + EffectiveThreadConfig, InitializeResponse, ModelProviderCapabilitiesReadResponse, + PluginListResponse, ProbeDynamicToolHandler, RequestWaitPhase, RunRecorder, + RuntimeConfigSummary, SkillsListResponse, TurnContinuationGuard, UserInput, }, json_rpc::{ AppServerHomePreflightFailure, AppServerOutputTimeout, AppServerProcessEnv, @@ -31,6 +33,7 @@ use crate::{ }, prelude::{Result, eyre}, state::{self, StateStore}, + test_support::TestEnvVarGuard, }; struct RejectingCompletionHandler; @@ -402,6 +405,100 @@ fn command_exec_health_check_uses_bounded_standalone_request() { assert!(value.get("permissionProfile").is_none()); } +#[test] +fn archive_thread_after_success_calls_app_server_archive_and_records_event() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let fake_bin_dir = temp_dir.path().join("fake-bin"); + let fake_codex_path = fake_bin_dir.join("codex"); + let invocation_log_path = temp_dir.path().join("codex-invocations.jsonl"); + let invocation_log_literal = + serde_json::to_string(&invocation_log_path).expect("log path should serialize"); + let fake_codex_script = format!( + r#"#!/usr/bin/env python3 +import json +import os +import sys + +log_path = {invocation_log_literal} + +with open(log_path, "a", encoding="utf-8") as log: + log.write(json.dumps({{"args": sys.argv[1:]}}) + "\n") + +for line in sys.stdin: + message = json.loads(line) + method = message.get("method") + if method == "initialize": + print(json.dumps({{ + "id": message["id"], + "result": {{ + "userAgent": "fake-codex", + "codexHome": os.environ["CODEX_HOME"], + "platformFamily": "unix", + "platformOs": "macos" + }} + }}), flush=True) + elif method == "initialized": + continue + elif method == "thread/archive": + with open(log_path, "a", encoding="utf-8") as log: + log.write(json.dumps(message, sort_keys=True) + "\n") + print(json.dumps({{"id": message["id"], "result": {{}}}}), flush=True) + print(json.dumps({{ + "method": "thread/archived", + "params": {{"threadId": message["params"]["threadId"]}} + }}), flush=True) + else: + print(json.dumps({{ + "id": message.get("id"), + "error": {{"code": -32601, "message": "unexpected method " + str(method)}} + }}), flush=True) +"# + ); + + fs::create_dir_all(&fake_bin_dir).expect("fake bin directory should create"); + fs::write(&fake_codex_path, fake_codex_script).expect("fake codex script should write"); + + let mut permissions = + fs::metadata(&fake_codex_path).expect("fake codex metadata should read").permissions(); + + #[cfg(unix)] + PermissionsExt::set_mode(&mut permissions, 0o755); + fs::set_permissions(&fake_codex_path, permissions) + .expect("fake codex script should be executable"); + + let path_env = env::var("PATH").unwrap_or_default(); + let _path_guard = + TestEnvVarGuard::set("PATH", &format!("{}:{path_env}", fake_bin_dir.display())); + let state_store = StateStore::open_in_memory().expect("state store should open"); + + state_store + .record_run_attempt("run-1", "issue-1", 1, "succeeded") + .expect("run attempt should record"); + + super::archive_app_server_thread_after_success( + &AppServerThreadArchiveRequest { + run_id: "run-1", + issue_id: "issue-1", + attempt_number: 1, + listen: "stdio://", + process_env: &AppServerProcessEnv::default(), + thread_id: "thread-1", + sequence_number: 1, + }, + &state_store, + ) + .expect("thread archive should succeed"); + + let invocation_log = + fs::read_to_string(&invocation_log_path).expect("invocation log should exist"); + + assert!(invocation_log.contains(r#""app-server""#)); + assert!(invocation_log.contains(r#""--listen""#)); + assert!(invocation_log.contains(r#""method": "thread/archive""#)); + assert!(invocation_log.contains(r#""threadId": "thread-1""#)); + assert_eq!(state_store.event_count("run-1").expect("event count should load"), 1); +} + #[test] fn command_exec_health_check_validates_exact_buffered_result() { let health_check = CommandExecHealthCheck { diff --git a/apps/decodex/src/orchestrator/execution.rs b/apps/decodex/src/orchestrator/execution.rs index 44a20990..3561e7bf 100644 --- a/apps/decodex/src/orchestrator/execution.rs +++ b/apps/decodex/src/orchestrator/execution.rs @@ -1,6 +1,7 @@ use git_credentials::GitSigningConfig; use agent::CodexAccountPool; use agent::CodexAccountProvider; +use agent::AppServerThreadArchiveRequest; struct AgentGitCredentialEnvironment { process_env: AppServerProcessEnv, @@ -39,6 +40,21 @@ struct TerminalFailureWritebackRuntime<'a> { state_store: Option<&'a StateStore>, } +struct CompletedAppServerRun<'a, T> +where + T: IssueTracker, +{ + tracker: &'a T, + project: &'a ServiceConfig, + workflow: &'a WorkflowDocument, + state_store: &'a StateStore, + issue_run: &'a IssueRunPlan, + tracker_tool_bridge: &'a TrackerToolBridge<'a>, + process_env: &'a AppServerProcessEnv, + transport: &'a str, + run_result: &'a AppServerRunResult, +} + fn execute_issue_run( tracker: &T, project: &ServiceConfig, @@ -427,7 +443,7 @@ where run_id: issue_run.run_id.clone(), issue_id: issue_run.issue.id.clone(), attempt_number: issue_run.attempt_number, - listen: transport, + listen: transport.clone(), cwd: issue_run.worktree.path.display().to_string(), developer_instructions: build_run_developer_instructions( tracker, @@ -481,16 +497,82 @@ where return Ok(continuation_boundary_summary(project, workflow, issue_run, &run_result)); } - apply_run_completion_disposition( + finalize_completed_app_server_run(CompletedAppServerRun { tracker, project, workflow, state_store, issue_run, - &tracker_tool_bridge, + tracker_tool_bridge: &tracker_tool_bridge, + process_env: agent_git_credentials.process_env(), + transport: &transport, + run_result: &run_result, + }) +} + +fn finalize_completed_app_server_run(run: CompletedAppServerRun<'_, T>) -> Result +where + T: IssueTracker, +{ + apply_run_completion_disposition( + run.tracker, + run.project, + run.workflow, + run.state_store, + run.issue_run, + run.tracker_tool_bridge, )?; + archive_completed_app_server_thread_best_effort( + run.project, + run.state_store, + run.issue_run, + run.process_env, + run.transport, + run.run_result, + ); + + Ok(run_summary_from_issue_run(run.project.service_id(), run.issue_run)) +} + +fn archive_completed_app_server_thread_best_effort( + project: &ServiceConfig, + state_store: &StateStore, + issue_run: &IssueRunPlan, + process_env: &AppServerProcessEnv, + transport: &str, + run_result: &AppServerRunResult, +) { + let archive_request = AppServerThreadArchiveRequest { + run_id: &issue_run.run_id, + issue_id: &issue_run.issue.id, + attempt_number: issue_run.attempt_number, + listen: transport, + process_env, + thread_id: &run_result.thread_id, + sequence_number: run_result.event_count.saturating_add(1), + }; - Ok(run_summary_from_issue_run(project.service_id(), issue_run)) + match agent::archive_app_server_thread_after_success(&archive_request, state_store) { + Ok(()) => tracing::info!( + project_id = project.service_id(), + issue_id = issue_run.issue.id, + issue = issue_run.issue.identifier, + run_id = issue_run.run_id, + attempt = issue_run.attempt_number, + thread_id = %run_result.thread_id, + "Archived completed app-server thread." + ), + Err(error) => tracing::warn!( + ?error, + project_id = project.service_id(), + issue_id = issue_run.issue.id, + issue = issue_run.issue.identifier, + run_id = issue_run.run_id, + attempt = issue_run.attempt_number, + thread_id = %run_result.thread_id, + "Failed to archive completed app-server thread; leaving completed run intact." + ), + } } fn maybe_execute_deterministic_closeout(