Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/decodex/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ mod tracker_tool_bridge;
pub(crate) use self::{
app_server::{
ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure,
AppServerRunRequest, AppServerRunResult, AppServerTurnFailure, TurnContinuationGuard,
AppServerRunRequest, AppServerRunResult, AppServerThreadArchiveRequest,
AppServerTurnFailure, TurnContinuationGuard, archive_app_server_thread_after_success,
execute_app_server_run, probe_app_server,
},
codex_accounts::{CodexAccountPool, CodexAccountProvider},
Expand Down
85 changes: 82 additions & 3 deletions apps/decodex/src/agent/app_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use self::protocol::{
ModelListParams, ModelListResponse, ModelProviderCapabilitiesReadResponse, ModelSummary,
PermissionGrantScope, PermissionsRequestApprovalResponse, PluginListParams, PluginListResponse,
ProbeDynamicToolHandler, RunOutcome, RuntimeConfigSummary, SkillsListParams,
SkillsListResponse, ThreadResumeRequest, ThreadSessionResponse, ThreadStartRequest,
ThreadStatusChangedNotification, ToolRequestUserInputResponse, TurnCompletedNotification,
TurnError, TurnStartRequest, UserInput,
SkillsListResponse, ThreadArchiveRequest, ThreadResumeRequest, ThreadSessionResponse,
ThreadStartRequest, ThreadStatusChangedNotification, ToolRequestUserInputResponse,
TurnCompletedNotification, TurnError, TurnStartRequest, UserInput,
};
use crate::{
agent::{
Expand Down Expand Up @@ -316,6 +316,16 @@ pub(crate) struct AppServerRunRequest<'a> {
pub(crate) codex_account_provider: Option<&'a dyn CodexAccountProvider>,
}

pub(crate) struct AppServerThreadArchiveRequest<'a> {
pub(crate) run_id: &'a str,
pub(crate) issue_id: &'a str,
pub(crate) attempt_number: i64,
pub(crate) listen: &'a str,
pub(crate) process_env: &'a AppServerProcessEnv,
pub(crate) thread_id: &'a str,
pub(crate) sequence_number: i64,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct CommandExecHealthCheck {
pub(crate) command: Vec<String>,
Expand Down Expand Up @@ -908,6 +918,17 @@ pub(crate) fn execute_app_server_run(
result
}

pub(crate) fn archive_app_server_thread_after_success(
request: &AppServerThreadArchiveRequest<'_>,
state_store: &StateStore,
) -> crate::prelude::Result<()> {
let result = archive_app_server_thread_after_success_inner(request);

record_thread_archive_result_best_effort(state_store, request, result.as_ref().err());

result
}

pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result<AppServerRunResult> {
let state_store = StateStore::open_in_memory()?;
let probe_tool_handler = ProbeDynamicToolHandler;
Expand Down Expand Up @@ -945,6 +966,64 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result<AppServer
Ok(result)
}

fn archive_app_server_thread_after_success_inner(
request: &AppServerThreadArchiveRequest<'_>,
) -> crate::prelude::Result<()> {
let expected_codex_home = request.process_env.resolve_codex_home_env()?;
let mut client = AppServerClient::spawn(request.listen, request.process_env)?;
let initialize_response = client.initialize(false)?;

validate_initialize_codex_home(&expected_codex_home, &initialize_response)?;

client.mark_initialized()?;
client.archive_thread(ThreadArchiveRequest { thread_id: request.thread_id.to_owned() })?;

Ok(())
}

fn record_thread_archive_result_best_effort(
state_store: &StateStore,
request: &AppServerThreadArchiveRequest<'_>,
error: Option<&Report>,
) {
let (event_type, payload) = match error {
Some(error) => (
"thread/archive/failed",
serde_json::json!({
"threadId": request.thread_id,
"issueId": request.issue_id,
"attemptNumber": request.attempt_number,
"error": error.to_string(),
}),
),
None => (
"thread/archive",
serde_json::json!({
"threadId": request.thread_id,
"issueId": request.issue_id,
"attemptNumber": request.attempt_number,
}),
),
};

if let Err(record_error) = state_store.append_event(
request.run_id,
request.sequence_number,
event_type,
&payload.to_string(),
) {
tracing::warn!(
?record_error,
run_id = request.run_id,
issue_id = request.issue_id,
attempt = request.attempt_number,
thread_id = request.thread_id,
event_type,
"Failed to record app-server thread archive event."
);
}
}

fn classify_child_activity_event(
event_type: &str,
payload: &str,
Expand Down
16 changes: 16 additions & 0 deletions apps/decodex/src/agent/app_server/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ impl AppServerClient {
self.connection.request_with_handler("thread/resume", &params, REQUEST_TIMEOUT, handler)
}

pub(super) fn archive_thread(
&mut self,
params: ThreadArchiveRequest,
) -> Result<ThreadArchiveResponse> {
self.connection.request("thread/archive", &params, REQUEST_TIMEOUT)
}

#[allow(dead_code)]
pub(super) fn start_turn(&mut self, params: TurnStartRequest) -> Result<TurnStartResponse> {
self.start_turn_with_handler(params, |_connection, _message, request| {
Expand Down Expand Up @@ -319,6 +326,15 @@ pub(super) struct ThreadResumeRequest {
pub(super) developer_instructions: Option<String>,
}

#[derive(Debug, Serialize)]
pub(super) struct ThreadArchiveRequest {
#[serde(rename = "threadId")]
pub(super) thread_id: String,
}

#[derive(Debug, Deserialize)]
pub(super) struct ThreadArchiveResponse {}

#[derive(Clone, Debug, Deserialize)]
pub(super) struct ThreadSessionResponse {
pub(super) thread: Thread,
Expand Down
107 changes: 102 additions & 5 deletions apps/decodex/src/agent/app_server/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{
cell::RefCell,
collections::BTreeMap,
env, fs,
os::unix::fs::PermissionsExt,
path::PathBuf,
time::{Duration, Instant},
};
Expand All @@ -13,11 +15,11 @@ use crate::{
agent::{
app_server::{
AppServerCapabilityPreflightFailure, AppServerCapabilityPreflightReport,
AppServerDynamicToolFailure, AppServerRunResult, AppServerTurnFailure,
CommandExecHealthCheck, CommandExecResponse, EffectiveThreadConfig, InitializeResponse,
ModelProviderCapabilitiesReadResponse, PluginListResponse, ProbeDynamicToolHandler,
RequestWaitPhase, RunRecorder, RuntimeConfigSummary, SkillsListResponse,
TurnContinuationGuard, UserInput,
AppServerDynamicToolFailure, AppServerRunResult, AppServerThreadArchiveRequest,
AppServerTurnFailure, CommandExecHealthCheck, CommandExecResponse,
EffectiveThreadConfig, InitializeResponse, ModelProviderCapabilitiesReadResponse,
PluginListResponse, ProbeDynamicToolHandler, RequestWaitPhase, RunRecorder,
RuntimeConfigSummary, SkillsListResponse, TurnContinuationGuard, UserInput,
},
json_rpc::{
AppServerHomePreflightFailure, AppServerOutputTimeout, AppServerProcessEnv,
Expand All @@ -31,6 +33,7 @@ use crate::{
},
prelude::{Result, eyre},
state::{self, StateStore},
test_support::TestEnvVarGuard,
};

struct RejectingCompletionHandler;
Expand Down Expand Up @@ -402,6 +405,100 @@ fn command_exec_health_check_uses_bounded_standalone_request() {
assert!(value.get("permissionProfile").is_none());
}

#[test]
fn archive_thread_after_success_calls_app_server_archive_and_records_event() {
let temp_dir = TempDir::new().expect("tempdir should create");
let fake_bin_dir = temp_dir.path().join("fake-bin");
let fake_codex_path = fake_bin_dir.join("codex");
let invocation_log_path = temp_dir.path().join("codex-invocations.jsonl");
let invocation_log_literal =
serde_json::to_string(&invocation_log_path).expect("log path should serialize");
let fake_codex_script = format!(
r#"#!/usr/bin/env python3
import json
import os
import sys

log_path = {invocation_log_literal}

with open(log_path, "a", encoding="utf-8") as log:
log.write(json.dumps({{"args": sys.argv[1:]}}) + "\n")

for line in sys.stdin:
message = json.loads(line)
method = message.get("method")
if method == "initialize":
print(json.dumps({{
"id": message["id"],
"result": {{
"userAgent": "fake-codex",
"codexHome": os.environ["CODEX_HOME"],
"platformFamily": "unix",
"platformOs": "macos"
}}
}}), flush=True)
elif method == "initialized":
continue
elif method == "thread/archive":
with open(log_path, "a", encoding="utf-8") as log:
log.write(json.dumps(message, sort_keys=True) + "\n")
print(json.dumps({{"id": message["id"], "result": {{}}}}), flush=True)
print(json.dumps({{
"method": "thread/archived",
"params": {{"threadId": message["params"]["threadId"]}}
}}), flush=True)
else:
print(json.dumps({{
"id": message.get("id"),
"error": {{"code": -32601, "message": "unexpected method " + str(method)}}
}}), flush=True)
"#
);

fs::create_dir_all(&fake_bin_dir).expect("fake bin directory should create");
fs::write(&fake_codex_path, fake_codex_script).expect("fake codex script should write");

let mut permissions =
fs::metadata(&fake_codex_path).expect("fake codex metadata should read").permissions();

#[cfg(unix)]
PermissionsExt::set_mode(&mut permissions, 0o755);
fs::set_permissions(&fake_codex_path, permissions)
.expect("fake codex script should be executable");

let path_env = env::var("PATH").unwrap_or_default();
let _path_guard =
TestEnvVarGuard::set("PATH", &format!("{}:{path_env}", fake_bin_dir.display()));
let state_store = StateStore::open_in_memory().expect("state store should open");

state_store
.record_run_attempt("run-1", "issue-1", 1, "succeeded")
.expect("run attempt should record");

super::archive_app_server_thread_after_success(
&AppServerThreadArchiveRequest {
run_id: "run-1",
issue_id: "issue-1",
attempt_number: 1,
listen: "stdio://",
process_env: &AppServerProcessEnv::default(),
thread_id: "thread-1",
sequence_number: 1,
},
&state_store,
)
.expect("thread archive should succeed");

let invocation_log =
fs::read_to_string(&invocation_log_path).expect("invocation log should exist");

assert!(invocation_log.contains(r#""app-server""#));
assert!(invocation_log.contains(r#""--listen""#));
assert!(invocation_log.contains(r#""method": "thread/archive""#));
assert!(invocation_log.contains(r#""threadId": "thread-1""#));
assert_eq!(state_store.event_count("run-1").expect("event count should load"), 1);
}

#[test]
fn command_exec_health_check_validates_exact_buffered_result() {
let health_check = CommandExecHealthCheck {
Expand Down
Loading