Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 71 additions & 35 deletions apps/decodex/src/agent/app_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub(crate) const ACTIVE_RUN_IDLE_TIMEOUT: Duration = Duration::from_secs(300);

const PROBE_TIMEOUT: Duration = Duration::from_secs(30);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
const MCP_PREFLIGHT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
const PROBE_RUN_ID: &str = "protocol-probe-run";
const PROBE_ISSUE_ID: &str = "protocol-probe";
const PROBE_EXPECTED_OUTPUT: &str = "PROBE_OK";
Expand All @@ -67,7 +68,7 @@ const PROBE_DEVELOPER_INSTRUCTIONS: &str = "You are a protocol probe. You must c
const PROBE_USER_INPUT: &str = "Call `echo_probe` with `{\\\"text\\\":\\\"PROBE_OK\\\"}`. After the tool succeeds, reply with the exact text PROBE_OK.";
const PREFLIGHT_EVENT_TYPE: &str = "app-server/preflight";
const PREFLIGHT_MODEL_PAGE_LIMIT: u32 = 200;
const PREFLIGHT_MCP_PAGE_LIMIT: u32 = 200;
const PREFLIGHT_MCP_PAGE_LIMIT: u32 = 50;
const PREFLIGHT_MCP_DETAIL: &str = "toolsAndAuthOnly";
const PREFLIGHT_CHECK_CONFIG: &str = "config";
const PREFLIGHT_CHECK_MODEL: &str = "model";
Expand Down Expand Up @@ -1892,9 +1893,16 @@ fn run_app_server_capability_preflight(

record_plugin_preflight(&mut report, &plugins);

let mcp_servers = list_all_mcp_servers_for_preflight(client, recorder, &report)?;
match list_all_mcp_servers_for_preflight(client) {
Ok(mcp_servers) => record_mcp_preflight(&mut report, &mcp_servers),
Err(error) if mcp_preflight_can_degrade(&error) => {
record_mcp_preflight_degraded(&mut report, &error);
},
Err(error) => {
return preflight_method_failure(recorder, &report, "mcpServerStatus/list", error);
},
}

record_mcp_preflight(&mut report, &mcp_servers);
record_app_server_preflight_report(recorder, &report)?;

if report.has_blockers() {
Expand All @@ -1904,6 +1912,33 @@ fn run_app_server_capability_preflight(
Ok(report)
}

fn preflight_method_failure<T>(
recorder: &mut RunRecorder<'_>,
report: &AppServerCapabilityPreflightReport,
method: &'static str,
error: Report,
) -> crate::prelude::Result<T> {
let error_message = error.to_string();
let mut failed_report = report.clone();
let mut details = BTreeMap::new();

details.insert(String::from("method"), method.to_owned());
details.insert(String::from("error"), error_message.clone());
failed_report.push_blocked(
check_name_for_method(method),
format!("`{method}` failed before thread/start."),
details,
);

record_app_server_preflight_report(recorder, &failed_report)?;

Err(Report::new(AppServerCapabilityPreflightFailure::method_failed(
method,
error_message,
failed_report,
)))
}

fn preflight_request<T, F>(
recorder: &mut RunRecorder<'_>,
report: &AppServerCapabilityPreflightReport,
Expand All @@ -1915,26 +1950,7 @@ where
{
match request() {
Ok(response) => Ok(response),
Err(error) => {
let mut failed_report = report.clone();
let mut details = BTreeMap::new();

details.insert(String::from("method"), method.to_owned());
details.insert(String::from("error"), error.to_string());
failed_report.push_blocked(
check_name_for_method(method),
format!("`{method}` failed before thread/start."),
details,
);

record_app_server_preflight_report(recorder, &failed_report)?;

Err(Report::new(AppServerCapabilityPreflightFailure::method_failed(
method,
error.to_string(),
failed_report,
)))
},
Err(error) => preflight_method_failure(recorder, report, method, error),
}
}

Expand Down Expand Up @@ -1968,21 +1984,19 @@ fn list_all_models_for_preflight(

fn list_all_mcp_servers_for_preflight(
client: &mut AppServerClient,
recorder: &mut RunRecorder<'_>,
report: &AppServerCapabilityPreflightReport,
) -> crate::prelude::Result<Vec<McpServerStatusSummary>> {
let mut cursor = None;
let mut servers = Vec::new();

loop {
let response: ListMcpServerStatusResponse =
preflight_request(recorder, report, "mcpServerStatus/list", || {
client.list_mcp_server_status(&ListMcpServerStatusParams {
cursor: cursor.clone(),
detail: Some(PREFLIGHT_MCP_DETAIL.to_owned()),
limit: Some(PREFLIGHT_MCP_PAGE_LIMIT),
})
})?;
let response: ListMcpServerStatusResponse = client.list_mcp_server_status(
&ListMcpServerStatusParams {
cursor: cursor.clone(),
detail: Some(PREFLIGHT_MCP_DETAIL.to_owned()),
limit: Some(PREFLIGHT_MCP_PAGE_LIMIT),
},
MCP_PREFLIGHT_REQUEST_TIMEOUT,
)?;

servers.extend(response.data);

Expand Down Expand Up @@ -2212,6 +2226,27 @@ fn record_mcp_preflight(
}
}

fn mcp_preflight_can_degrade(error: &Report) -> bool {
error.downcast_ref::<AppServerOutputTimeout>().is_some()
}

fn record_mcp_preflight_degraded(report: &mut AppServerCapabilityPreflightReport, error: &Report) {
let mut details = BTreeMap::new();

details.insert(String::from("method"), String::from("mcpServerStatus/list"));
details.insert(String::from("degraded_reason"), String::from("timeout"));
details.insert(String::from("error"), error.to_string());
details.insert(
String::from("timeout_seconds"),
MCP_PREFLIGHT_REQUEST_TIMEOUT.as_secs().to_string(),
);
report.push_ok(
PREFLIGHT_CHECK_MCP,
"mcpServerStatus/list timed out during optional MCP inventory; continuing after core app-server capability checks passed.",
details,
);
}

fn record_app_server_preflight_report(
recorder: &mut RunRecorder<'_>,
report: &AppServerCapabilityPreflightReport,
Expand Down Expand Up @@ -3378,8 +3413,9 @@ fn handle_dynamic_tool_call(
"Dynamic tool `{}` was called under namespace `{namespace}`, but this run did not declare that tool namespace.",
payload.tool
),
None =>
format!("Dynamic tool `{}` is not declared for this run attempt.", payload.tool),
None => {
format!("Dynamic tool `{}` is not declared for this run attempt.", payload.tool)
},
};

return DynamicToolCallDispatch::protocol_failure(
Expand Down
3 changes: 2 additions & 1 deletion apps/decodex/src/agent/app_server/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ impl AppServerClient {
pub(super) fn list_mcp_server_status(
&mut self,
params: &ListMcpServerStatusParams,
timeout: Duration,
) -> Result<ListMcpServerStatusResponse> {
self.connection.request("mcpServerStatus/list", params, REQUEST_TIMEOUT)
self.connection.request("mcpServerStatus/list", params, timeout)
}

pub(super) fn recv(&mut self, timeout: Option<Duration>) -> Result<WireMessage> {
Expand Down
26 changes: 24 additions & 2 deletions apps/decodex/src/agent/app_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
time::{Duration, Instant},
};

use color_eyre::Report;
use serde_json::{self, Value};
use tempfile::TempDir;

Expand All @@ -19,8 +20,9 @@ use crate::{
TurnContinuationGuard, UserInput,
},
json_rpc::{
AppServerHomePreflightFailure, AppServerProcessEnv, JsonRpcMessage,
JsonRpcNotification, JsonRpcRequest, ResolvedAppServerCodexHomeEnv, WireMessage,
AppServerHomePreflightFailure, AppServerOutputTimeout, AppServerProcessEnv,
JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, ResolvedAppServerCodexHomeEnv,
WireMessage,
},
tracker_tool_bridge::{
DynamicToolCallResponse, DynamicToolContentItem, DynamicToolHandler, DynamicToolSpec,
Expand Down Expand Up @@ -587,6 +589,26 @@ fn capability_preflight_request_error_records_method_blocker() {
assert_eq!(state_store.event_count("run-1").expect("event count should load"), 1);
}

#[test]
fn mcp_preflight_timeout_degrades_to_recorded_ok_check() {
let error = Report::new(AppServerOutputTimeout);
let mut report = AppServerCapabilityPreflightReport::new();

assert!(super::mcp_preflight_can_degrade(&error));

super::record_mcp_preflight_degraded(&mut report, &error);

assert!(!report.has_blockers());
assert_eq!(report.checks().len(), 1);
assert_eq!(report.checks()[0].name, "mcp");
assert_eq!(report.checks()[0].status, super::AppServerCapabilityPreflightStatus::Ok);
assert_eq!(
report.checks()[0].details.get("degraded_reason").map(String::as_str),
Some("timeout")
);
assert!(report.checks()[0].summary.contains("continuing"));
}

#[test]
fn remaining_idle_budget_resets_from_latest_activity() {
let now = Instant::now();
Expand Down
36 changes: 34 additions & 2 deletions apps/decodex/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,22 @@ struct RunCommand {
/// Skip external side effects where the later implementation allows it.
#[arg(long)]
dry_run: bool,
/// Explain current queued candidates without preparing or dispatching a lane.
#[arg(long, requires = "dry_run")]
explain: bool,
}
impl RunCommand {
fn run(&self, config_path: Option<&Path>) -> crate::prelude::Result<()> {
if self.explain && self.issue.is_some() {
eyre::bail!(
"`decodex run --dry-run --explain` explains the project queue and does not accept a positional issue."
);
}

orchestrator::run_once(RunOnceRequest {
config_path,
dry_run: self.dry_run,
explain_queue: self.explain,
preferred_issue_id: self.issue.as_deref(),
preferred_issue_state: None,
preferred_initial_issue_state: None,
Expand Down Expand Up @@ -516,6 +526,7 @@ impl AttemptCommand {
orchestrator::run_once(RunOnceRequest {
config_path,
dry_run: request.dry_run,
explain_queue: false,
preferred_issue_id: Some(request.issue_id.as_str()),
preferred_issue_state: Some(request.issue_state.as_str()),
preferred_initial_issue_state: request.initial_issue_state.as_deref(),
Expand Down Expand Up @@ -685,14 +696,35 @@ mod tests {
fn parses_run_with_positional_issue_and_dry_run() {
let cli = Cli::parse_from(["decodex", "run", "issue-1", "--dry-run"]);

assert!(matches!(cli.command, Command::Run(RunCommand { issue: Some(_), dry_run: true })));
assert!(matches!(
cli.command,
Command::Run(RunCommand { issue: Some(_), dry_run: true, explain: false })
));
}

#[test]
fn parses_run_without_issue() {
let cli = Cli::parse_from(["decodex", "run"]);

assert!(matches!(cli.command, Command::Run(RunCommand { issue: None, dry_run: false })));
assert!(matches!(
cli.command,
Command::Run(RunCommand { issue: None, dry_run: false, explain: false })
));
}

#[test]
fn parses_run_dry_run_explain() {
let cli = Cli::parse_from(["decodex", "run", "--dry-run", "--explain"]);

assert!(matches!(
cli.command,
Command::Run(RunCommand { issue: None, dry_run: true, explain: true })
));

let error = Cli::try_parse_from(["decodex", "run", "--explain"])
.expect_err("explain should require dry-run");

assert!(error.to_string().contains("--dry-run"));
}

#[test]
Expand Down
Loading