Skip to content
3 changes: 2 additions & 1 deletion apps/decodex/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ mod decodex_tool_bridge;
mod json_rpc;
mod tracker_tool_bridge;

#[cfg(test)] pub(crate) use self::app_server::MODEL_EXECUTION_IDLE_TIMEOUT;
#[cfg(test)] pub(crate) use self::tracker_tool_bridge::DynamicToolHandler;
pub(crate) use self::{
app_server::{
ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure,
AppServerRunRequest, AppServerRunResult, AppServerThreadArchiveRequest,
AppServerTurnFailure, TurnContinuationGuard, archive_app_server_thread_after_success,
execute_app_server_run, probe_app_server,
execute_app_server_run, probe_app_server, protocol_activity_idle_timeout,
},
codex_accounts::{CodexAccountPool, CodexAccountProvider},
decodex_tool_bridge::{DecodexRunContext, DecodexToolBridge},
Expand Down
24 changes: 23 additions & 1 deletion apps/decodex/src/agent/app_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{
};

pub(crate) const ACTIVE_RUN_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
pub(crate) const MODEL_EXECUTION_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60);

const PROBE_TIMEOUT: Duration = Duration::from_secs(30);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
Expand All @@ -80,6 +81,7 @@ const PREFLIGHT_CHECK_MCP: &str = "mcp";
const PREFLIGHT_PLUGIN_MARKETPLACE_KIND: &str = "local";
const JSONRPC_METHOD_NOT_FOUND: i64 = -32_601;
const CHILD_BUCKET_MODEL: &str = "Model";
const WAITING_REASON_MODEL_EXECUTION: &str = "model_execution";
const CHILD_BUCKET_PROTOCOL: &str = "Protocol";
const CHILD_BUCKET_TOOL: &str = "Tool";
const CHILD_BUCKET_SHELL: &str = "Shell";
Expand Down Expand Up @@ -969,6 +971,17 @@ impl RequestWaitPhase {
}
}

pub(crate) fn protocol_activity_idle_timeout(
protocol_activity: Option<&state::ProtocolActivitySummary>,
base_timeout: Duration,
) -> Duration {
if protocol_activity.is_some_and(running_model_execution_protocol_activity) {
return base_timeout.max(MODEL_EXECUTION_IDLE_TIMEOUT);
}

base_timeout
}

pub(crate) fn execute_app_server_run(
request: &AppServerRunRequest<'_>,
state_store: &StateStore,
Expand Down Expand Up @@ -1050,6 +1063,13 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result<AppServer
Ok(result)
}

fn running_model_execution_protocol_activity(
protocol_activity: &state::ProtocolActivitySummary,
) -> bool {
protocol_activity.turn_status.as_deref() == Some("running")
&& protocol_activity.waiting_reason.as_deref() == Some(WAITING_REASON_MODEL_EXECUTION)
}

fn preflight_check_blocker_summary(check: &AppServerCapabilityPreflightCheck) -> String {
let first_error_path = check.details.get("first_error_path");
let first_error = check.details.get("first_error");
Expand Down Expand Up @@ -3135,10 +3155,12 @@ fn wait_for_turn_completion(
let mut latest_turn_failure: Option<AppServerTurnFailure> = None;

loop {
let idle_timeout =
protocol_activity_idle_timeout(Some(&recorder.protocol_activity.summary), timeout);
let wire_message = next_turn_wire_message(
client,
last_activity_at,
timeout,
idle_timeout,
target_thread_id,
target_turn_id,
latest_turn_failure.as_ref(),
Expand Down
36 changes: 35 additions & 1 deletion apps/decodex/src/agent/app_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
},
},
prelude::{Result, eyre},
state::{self, StateStore},
state::{self, ProtocolActivitySummary, StateStore},
test_support::TestEnvVarGuard,
};

Expand Down Expand Up @@ -1006,6 +1006,40 @@ fn remaining_idle_budget_expires_after_idle_timeout() {
assert!(super::remaining_idle_budget(last_activity_at, now, timeout).is_none());
}

#[test]
fn protocol_activity_idle_timeout_extends_running_model_execution() {
let protocol_activity = ProtocolActivitySummary {
turn_status: Some(String::from("running")),
waiting_reason: Some(String::from("model_execution")),
..ProtocolActivitySummary::default()
};

assert_eq!(
super::protocol_activity_idle_timeout(
Some(&protocol_activity),
super::ACTIVE_RUN_IDLE_TIMEOUT
),
super::MODEL_EXECUTION_IDLE_TIMEOUT
);
}

#[test]
fn protocol_activity_idle_timeout_keeps_base_timeout_for_other_waits() {
let protocol_activity = ProtocolActivitySummary {
turn_status: Some(String::from("running")),
waiting_reason: Some(String::from("tool_execution")),
..ProtocolActivitySummary::default()
};

assert_eq!(
super::protocol_activity_idle_timeout(
Some(&protocol_activity),
super::ACTIVE_RUN_IDLE_TIMEOUT
),
super::ACTIVE_RUN_IDLE_TIMEOUT
);
}

#[test]
fn run_recorder_keeps_events_when_marker_write_fails() {
let temp_dir = TempDir::new().expect("tempdir should create");
Expand Down
59 changes: 58 additions & 1 deletion apps/decodex/src/agent/json_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use serde_json::{self, Value};
use crate::git_credentials::{GitCredentialEnvironment, GitSigningConfig};

const APP_SERVER_STDERR_TAIL_LINES: usize = 20;
const CODEX_APP_SERVER_BINARY: &str = "codex";
const CODEX_HOME_ENV_VAR: &str = "CODEX_HOME";
const CODEX_SQLITE_HOME_ENV_VAR: &str = "CODEX_SQLITE_HOME";
const CODEX_HOME_DIR_NAME: &str = ".codex";
Expand Down Expand Up @@ -234,7 +235,7 @@ impl JsonRpcConnection {
listen: &str,
process_env: &AppServerProcessEnv,
) -> crate::prelude::Result<Self> {
let mut command = Command::new("codex");
let mut command = Command::new(app_server_command_program());
let _codex_home_env = configure_app_server_command(&mut command, listen, process_env)?;
let mut child = command.spawn()?;
let stdin =
Expand Down Expand Up @@ -691,11 +692,48 @@ fn configure_app_server_command(
process_env.apply_to(command)
}

fn app_server_command_program() -> PathBuf {
app_server_command_program_from_env(env::var_os("PATH"), env::var_os("HOME"))
}

fn app_server_command_program_from_env(
path_env: Option<OsString>,
home: Option<OsString>,
) -> PathBuf {
if let Some(path_env) = path_env {
for path_entry in env::split_paths(&path_env) {
let candidate = path_entry.join(CODEX_APP_SERVER_BINARY);

if candidate.is_file() {
return candidate;
}
}
}
if let Some(home) = home {
let home = PathBuf::from(home);

for relative_candidate in
[[".local", "bin", CODEX_APP_SERVER_BINARY], [".cargo", "bin", CODEX_APP_SERVER_BINARY]]
{
let candidate = relative_candidate
.iter()
.fold(home.clone(), |path, component| path.join(*component));

if candidate.is_file() {
return candidate;
}
}
}

PathBuf::from(CODEX_APP_SERVER_BINARY)
}

#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, VecDeque},
ffi::OsString,
fs,
path::PathBuf,
process::{Command, Stdio},
sync::{Arc, Mutex, mpsc},
Expand Down Expand Up @@ -810,6 +848,25 @@ mod tests {
assert_eq!(envs.get("GIT_CONFIG_VALUE_9").map(String::as_str), Some(""));
}

#[test]
fn app_server_program_falls_back_to_home_local_codex_when_path_is_sparse() {
let temp_dir = tempfile::tempdir().expect("tempdir should create");
let local_bin = temp_dir.path().join(".local/bin");

fs::create_dir_all(&local_bin).expect("local bin should create");

let codex_path = local_bin.join("codex");

fs::write(&codex_path, "#!/bin/sh\n").expect("codex fixture should write");

let resolved = super::app_server_command_program_from_env(
Some(OsString::from("/usr/bin:/bin")),
Some(temp_dir.path().as_os_str().to_owned()),
);

assert_eq!(resolved, codex_path);
}

#[test]
fn app_server_command_does_not_rewrite_git_urls_without_credentials() {
let mut command = Command::new("codex");
Expand Down
Loading