Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 119 additions & 11 deletions apps/decodex/src/agent/app_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
env,
error::Error,
fmt::{self, Display, Formatter},
fs,
path::{Path, PathBuf},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -49,7 +50,9 @@ use crate::{
prelude::eyre,
state::{
self, CodexAccountActivitySummary, CodexAccountMarker, EffectiveRuntimeMarker,
RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, StateStore,
RUN_CONTROL_CHANNEL_DIR, RUN_CONTROL_CHANNEL_STATUS_COMPLETED,
RUN_CONTROL_CHANNEL_STATUS_FAILED, RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE,
RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, RunControlChannel, StateStore,
},
};

Expand Down Expand Up @@ -1032,19 +1035,42 @@ pub(crate) fn execute_app_server_run(
write_activity_marker_best_effort(marker_path, &request.run_id, request.attempt_number);
}

let control_channel = publish_run_control_channel_for_request(request, state_store)?;
let result = execute_app_server_run_inner(request, state_store);

if result.is_err() {
state_store.record_run_attempt(
&request.run_id,
&request.issue_id,
request.attempt_number,
"failed",
)?;
match &result {
Ok(_result) =>
if control_channel.is_some() {
state_store.retire_run_control_channel_for_attempt(
&request.run_id,
request.attempt_number,
RUN_CONTROL_CHANNEL_STATUS_COMPLETED,
)?;
},
Err(_error) => {
state_store.record_run_attempt(
&request.run_id,
&request.issue_id,
request.attempt_number,
"failed",
)?;

if let Some(marker_path) = request.activity_marker_path.as_ref() {
write_activity_marker_best_effort(marker_path, &request.run_id, request.attempt_number);
}
if control_channel.is_some() {
state_store.retire_run_control_channel_for_attempt(
&request.run_id,
request.attempt_number,
RUN_CONTROL_CHANNEL_STATUS_FAILED,
)?;
}

if let Some(marker_path) = request.activity_marker_path.as_ref() {
write_activity_marker_best_effort(
marker_path,
&request.run_id,
request.attempt_number,
);
}
},
}

result
Expand Down Expand Up @@ -1927,6 +1953,88 @@ fn duration_seconds_i64(duration: Duration) -> i64 {
i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)
}

fn publish_run_control_channel_for_request(
request: &AppServerRunRequest<'_>,
state_store: &StateStore,
) -> crate::prelude::Result<Option<RunControlChannel>> {
let Some(marker_path) = request.activity_marker_path.as_ref() else {
return Ok(None);
};
let channel_path =
run_control_channel_path(marker_path, &request.run_id, request.attempt_number);

write_run_control_channel_file(&channel_path, request)?;

let channel = state_store.publish_run_control_channel_for_active_attempt(
&request.run_id,
request.attempt_number,
&channel_path,
RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE,
)?;

if let Some(channel) = channel.as_ref() {
state_store.append_private_execution_event(
channel.project_id(),
channel.issue_id(),
channel.run_id(),
channel.attempt_number(),
"control_channel_published",
serde_json::json!({
"schema": "decodex.run_control_channel/v1",
"transport": channel.transport(),
"channel_path": channel.channel_path().display().to_string(),
"status": channel.status(),
"published_at": channel.published_at(),
}),
)?;
}

Ok(channel)
}

fn run_control_channel_path(marker_path: &Path, run_id: &str, attempt_number: i64) -> PathBuf {
marker_path
.join(RUN_CONTROL_CHANNEL_DIR)
.join(format!("{}-{attempt_number}.channel", sanitize_run_control_path_segment(run_id)))
}

fn sanitize_run_control_path_segment(value: &str) -> String {
let sanitized = value
.chars()
.map(|character| {
if character.is_ascii_alphanumeric() || character == '-' || character == '_' {
character
} else {
'_'
}
})
.collect::<String>();

if sanitized.is_empty() { String::from("run") } else { sanitized }
}

fn write_run_control_channel_file(
channel_path: &Path,
request: &AppServerRunRequest<'_>,
) -> crate::prelude::Result<()> {
if let Some(parent) = channel_path.parent() {
fs::create_dir_all(parent)?;
}

fs::write(
channel_path,
format!(
"schema=decodex.run_control_channel/v1\nrun_id={}\nissue_id={}\nattempt_number={}\ntransport={}\n",
request.run_id,
request.issue_id,
request.attempt_number,
state::RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE,
),
)?;

Ok(())
}

fn write_activity_marker_best_effort(marker_path: &Path, run_id: &str, attempt_number: i64) {
if let Err(error) = state::write_run_operation_marker(
marker_path,
Expand Down
62 changes: 53 additions & 9 deletions apps/decodex/src/orchestrator/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4215,14 +4215,7 @@ fn operator_run_status(
.filter(|reason| reason != "turn_completed");
}

let account = marker.as_ref().and_then(RunActivityMarker::account).cloned();
let mut accounts = marker
.as_ref()
.map(|marker| marker.accounts().to_vec())
.unwrap_or_default();

append_primary_account_if_missing(&mut accounts, account.as_ref());

let (account, accounts) = operator_run_accounts(marker.as_ref());
let branch_name = run.branch_name().map(str::to_owned);
let worktree_path = operator_run_relative_worktree_path(project, &run);
let issue_identifier = operator_run_issue_identifier_from_fields(
Expand All @@ -4231,6 +4224,7 @@ fn operator_run_status(
worktree_path.as_deref(),
);
let private_evidence = operator_run_private_evidence(project, &run, issue_identifier.as_deref());
let control_capability = operator_run_control_capability(&run, &app_server_state);
let execution_liveness =
operator_run_execution_liveness(&status, &timing, &app_server_state, &protocol_summary);

Expand Down Expand Up @@ -4270,6 +4264,7 @@ fn operator_run_status(
last_event_at: protocol_summary.last_event_at,
event_count: protocol_summary.event_count,
private_evidence,
control_capability,
process_id: timing.process_id,
process_alive: timing.process_alive,
process_liveness_reason: timing.process_liveness_reason,
Expand All @@ -4290,6 +4285,17 @@ fn operator_run_status(
})
}

fn operator_run_accounts(
marker: Option<&RunActivityMarker>,
) -> (Option<CodexAccountActivitySummary>, Vec<CodexAccountActivitySummary>) {
let account = marker.and_then(RunActivityMarker::account).cloned();
let mut accounts = marker.map(|marker| marker.accounts().to_vec()).unwrap_or_default();

append_primary_account_if_missing(&mut accounts, account.as_ref());

(account, accounts)
}

fn operator_run_relative_worktree_path(
project: &ServiceConfig,
run: &ProjectRunStatus,
Expand All @@ -4312,6 +4318,27 @@ fn operator_run_private_evidence(
)
}

fn operator_run_control_capability(
run: &ProjectRunStatus,
app_server_state: &OperatorRunAppServerState,
) -> Option<OperatorRunControlCapability> {
let channel = run.control_channel()?;

Some(OperatorRunControlCapability {
project_id: channel.project_id().to_owned(),
issue_id: channel.issue_id().to_owned(),
run_id: channel.run_id().to_owned(),
attempt_number: channel.attempt_number(),
thread_id: app_server_state.thread_id.clone(),
turn_id: app_server_state.turn_id.clone(),
transport: channel.transport().to_owned(),
channel_path: channel.channel_path().display().to_string(),
status: channel.status().to_owned(),
published_at: channel.published_at().to_owned(),
updated_at: channel.updated_at().to_owned(),
})
}

fn operator_project_display_name(project: &ServiceConfig) -> String {
github_repo_slug_from_origin(project.repo_root())
.or_else(|| repo_root_path_display_name(project.repo_root()))
Expand Down Expand Up @@ -5423,6 +5450,21 @@ fn render_protocol_activity_summary(summary: Option<&ProtocolActivitySummary>) -
format!("turn={turn}; waiting={wait}; rate_limit={rate_limit}; recent={recent}")
}

fn render_control_capability_summary(
capability: Option<&OperatorRunControlCapability>,
) -> String {
let Some(capability) = capability else {
return String::from("none");
};
let thread_id = capability.thread_id.as_deref().unwrap_or("none");
let turn_id = capability.turn_id.as_deref().unwrap_or("none");

format!(
"status={}; transport={}; channel={}; thread_id={thread_id}; turn_id={turn_id}",
capability.status, capability.transport, capability.channel_path
)
}

fn render_account_summary(summary: Option<&CodexAccountActivitySummary>) -> String {
let Some(summary) = summary else {
return String::from("none");
Expand Down Expand Up @@ -5712,9 +5754,10 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) {
let account = render_account_summary(run.account.as_ref());
let accounts = render_accounts_summary(&run.accounts);
let private_evidence = render_private_evidence_reference(run);
let control_capability = render_control_capability_summary(run.control_capability.as_ref());

output.push_str(&format!(
"- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n private_evidence: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n process_liveness_reason: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n",
"- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n private_evidence: {}\n control_capability: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n process_liveness_reason: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n",
run.run_id,
run.project_id,
run.issue_id,
Expand Down Expand Up @@ -5743,6 +5786,7 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) {
protocol_activity,
context_pressure,
private_evidence,
control_capability,
thread_id,
turn_id,
thread_status,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ fn operator_status_snapshot_updates_owned_merged_worktree_hygiene_without_global

#[test]
fn live_operator_status_snapshot_hydrates_active_run_issue_display_metadata() {
let (_temp_dir, config, workflow) = temp_project_layout();
let (temp_dir, config, workflow) = temp_project_layout();
let state_store = StateStore::open_in_memory().expect("state store should open");
let run_id = "xy-392-attempt-1-1777551056";
let channel_path = temp_dir.path().join("control.channel");
let mut issue = sample_issue_with_sort_fields(
"issue-active",
"XY-392",
Expand All @@ -261,6 +262,14 @@ fn live_operator_status_snapshot_hydrates_active_run_issue_display_metadata() {
state_store
.upsert_lease(config.service_id(), &issue.id, run_id, "In Progress")
.expect("active lease should record");
state_store.update_run_thread(run_id, "thread-1").expect("thread should record");
state_store.update_run_turn(run_id, "turn-1").expect("turn should record");

std::fs::write(&channel_path, "ready\n").expect("control channel should write");

state_store
.publish_run_control_channel_for_active_attempt(run_id, 1, &channel_path, "local_file")
.expect("control channel should publish");

let snapshot = orchestrator::build_live_operator_status_snapshot(
&tracker,
Expand Down Expand Up @@ -301,6 +310,15 @@ fn live_operator_status_snapshot_hydrates_active_run_issue_display_metadata() {
snapshot_json["active_runs"][0]["private_evidence"]["read_command"],
format!("decodex evidence XY-392 --run-id {run_id} --attempt 1 --json")
);
assert_eq!(snapshot_json["active_runs"][0]["control_capability"]["status"], "active");
assert_eq!(
snapshot_json["active_runs"][0]["control_capability"]["thread_id"],
"thread-1"
);
assert_eq!(
snapshot_json["active_runs"][0]["control_capability"]["turn_id"],
"turn-1"
);
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ fn operator_status_text_renders_human_readable_sections() {
assert!(rendered.contains(
"context_pressure: input=current_window 105.0k, peak_window 105.0k (same as current), cumulative_input 4.27M; output_tokens=12.0k; largest_output=175.8KiB by view_image; warnings=view_image repeated 3 large outputs; largest 180000 bytes"
));
assert!(rendered.contains(
"control_capability: status=active; transport=local_file; channel=.worktrees/PUB-101/.decodex-run-control/run-1-1.channel; thread_id=thread-1; turn_id=turn-1"
));
assert!(rendered.contains("turn_id: turn-1"));
assert!(rendered.contains("thread_status: active"));
assert!(rendered.contains("thread_active_flags: waitingOnApproval"));
Expand Down
18 changes: 18 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{panic, slice};
use orchestrator::{OperatorPostReviewLaneStatus, OperatorQueuedIssueStatus, OperatorWorktreeStatus};
use serde_json::Value;
use orchestrator::AgentPrivateEvidenceRef;
use orchestrator::OperatorRunControlCapability;

fn successful_linear_execution_history_comments(issue: &TrackerIssue) -> Vec<TrackerComment> {
vec![
Expand Down Expand Up @@ -202,6 +203,22 @@ fn operator_status_text_backup_codex_account() -> state::CodexAccountActivitySum
}
}

fn operator_status_text_control_capability() -> OperatorRunControlCapability {
OperatorRunControlCapability {
project_id: String::from("pubfi"),
issue_id: String::from("issue-1"),
run_id: String::from("run-1"),
attempt_number: 1,
thread_id: Some(String::from("thread-1")),
turn_id: Some(String::from("turn-1")),
transport: String::from("local_file"),
channel_path: String::from(".worktrees/PUB-101/.decodex-run-control/run-1-1.channel"),
status: String::from("active"),
published_at: String::from("2026-03-14 10:00:00"),
updated_at: String::from("2026-03-14 10:00:01"),
}
}

fn operator_status_text_active_run() -> orchestrator::OperatorRunStatus {
let account = operator_status_text_codex_account();
let backup_account = operator_status_text_backup_codex_account();
Expand Down Expand Up @@ -247,6 +264,7 @@ fn operator_status_text_active_run() -> orchestrator::OperatorRunStatus {
"decodex evidence PUB-101 --run-id run-1 --attempt 1 --json",
),
},
control_capability: Some(operator_status_text_control_capability()),
process_id: Some(1_234),
process_alive: Some(true),
process_liveness_reason: Some(String::from("process_alive")),
Expand Down
Loading