From 958098234f0ef4ed937217832ad6540a2fcf2893 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 2 Jun 2026 23:21:31 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Implement active lane steer control","authority":"XY-704"} --- README.md | 1 + apps/decodex/src/agent.rs | 9 +- apps/decodex/src/agent/app_server.rs | 939 +++++++++++++++--- apps/decodex/src/agent/app_server/protocol.rs | 31 + apps/decodex/src/agent/app_server/tests.rs | 196 +++- apps/decodex/src/agent/json_rpc.rs | 13 +- apps/decodex/src/cli.rs | 163 ++- apps/decodex/src/orchestrator.rs | 5 +- apps/decodex/src/orchestrator/lane_control.rs | 279 ++++++ .../decodex/src/orchestrator/operator_http.rs | 94 ++ apps/decodex/src/orchestrator/types.rs | 35 + apps/decodex/src/state/models.rs | 58 ++ apps/decodex/src/state/store.rs | 82 ++ apps/decodex/src/state/tests.rs | 23 +- docs/reference/operator-control-plane.md | 11 +- docs/spec/app-server.md | 19 +- docs/spec/lane-control.md | 46 +- plugins/decodex/skills/automation/SKILL.md | 4 +- plugins/decodex/skills/manual-cli/SKILL.md | 9 +- 19 files changed, 1842 insertions(+), 175 deletions(-) create mode 100644 apps/decodex/src/orchestrator/lane_control.rs diff --git a/README.md b/README.md index 44023ead..7e992655 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,7 @@ cargo run -p decodex --bin decodex -- project list cargo run -p decodex --bin decodex -- status cargo run -p decodex --bin decodex -- diagnose --json cargo run -p decodex --bin decodex -- maintenance prune --dry-run +cargo run -p decodex --bin decodex -- lane steer --run-id --expected-turn-id --message cargo run -p decodex --bin decodex -- radar refresh-upstream-queue cargo run -p decodex --bin decodex -- radar refresh-release-delta cargo run -p decodex --bin decodex -- radar validate diff --git a/apps/decodex/src/agent.rs b/apps/decodex/src/agent.rs index 2dd32c9e..fdd19b5b 100644 --- a/apps/decodex/src/agent.rs +++ b/apps/decodex/src/agent.rs @@ -10,9 +10,12 @@ mod tracker_tool_bridge; pub(crate) use self::{ app_server::{ ACTIVE_RUN_IDLE_TIMEOUT, AppServerCapabilityPreflightFailure, AppServerDynamicToolFailure, - AppServerRunRequest, AppServerRunResult, AppServerThreadArchiveRequest, - AppServerTurnFailure, TurnContinuationGuard, archive_app_server_thread_after_success, - execute_app_server_run, probe_app_server, protocol_activity_idle_timeout, + AppServerRunRequest, AppServerRunResult, AppServerSteerChannelRequest, + AppServerSteerChannelRequestInput, AppServerSteerQueueReport, + AppServerThreadArchiveRequest, AppServerTurnFailure, DEFAULT_STEER_RESULT_WAIT_TIMEOUT, + TurnContinuationGuard, archive_app_server_thread_after_success, + enqueue_app_server_steer_request, execute_app_server_run, probe_app_server, + protocol_activity_idle_timeout, }, codex_accounts::{CodexAccountPool, CodexAccountProvider}, decodex_tool_bridge::{DecodexRunContext, DecodexToolBridge}, diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index 0e23b740..a74df267 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -6,17 +6,18 @@ pub(crate) use turn_failure::AppServerTurnFailure; use std::{ collections::{BTreeMap, HashMap}, env, - error::Error, fmt::{self, Display, Formatter}, fs, + io::ErrorKind, path::{Path, PathBuf}, + process, thread, time::{Duration, Instant}, }; use color_eyre::Report; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json::{self, Value}; -use time::OffsetDateTime; +use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use self::protocol::{ AgentMessageDeltaNotification, AppServerClient, ChatgptAuthTokensRefreshParams, @@ -31,7 +32,7 @@ use self::protocol::{ ProbeDynamicToolHandler, RunOutcome, RuntimeConfigSummary, SkillsListParams, SkillsListResponse, ThreadArchiveRequest, ThreadResumeRequest, ThreadSessionResponse, ThreadStartRequest, ThreadStatusChangedNotification, ToolRequestUserInputResponse, - TurnCompletedNotification, TurnError, TurnStartRequest, UserInput, + TurnCompletedNotification, TurnError, TurnStartRequest, TurnSteerRequest, UserInput, }; use crate::{ agent::{ @@ -50,17 +51,22 @@ use crate::{ prelude::eyre, state::{ self, CodexAccountActivitySummary, CodexAccountMarker, EffectiveRuntimeMarker, - RUN_CONTROL_CHANNEL_DIR, RUN_CONTROL_CHANNEL_STATUS_COMPLETED, - RUN_CONTROL_CHANNEL_STATUS_FAILED, RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, - RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, RunControlChannel, StateStore, + RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, RUN_CONTROL_CHANNEL_DIR, + RUN_CONTROL_CHANNEL_STATUS_COMPLETED, RUN_CONTROL_CHANNEL_STATUS_FAILED, + RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, RUN_OPERATION_AGENT_RUN, + RUN_OPERATION_APP_SERVER_PREFLIGHT, RunControlActionOutcomeRequest, RunControlChannel, + StateStore, }, }; pub(crate) const ACTIVE_RUN_IDLE_TIMEOUT: Duration = Duration::from_secs(300); pub(crate) const MODEL_EXECUTION_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60); +pub(crate) const DEFAULT_STEER_RESULT_WAIT_TIMEOUT: Duration = Duration::from_secs(10); const PROBE_TIMEOUT: Duration = Duration::from_secs(30); const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +const CONTROL_CHANNEL_POLL_INTERVAL: Duration = Duration::from_millis(500); +const STEER_RESULT_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100); const MCP_PREFLIGHT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); const PLUGIN_PREFLIGHT_MAX_ATTEMPTS: u32 = 2; const PROBE_RUN_ID: &str = "protocol-probe-run"; @@ -72,6 +78,20 @@ const PROBE_COMMAND_EXEC_OUTPUT_BYTES_CAP: u64 = 1_024; const PROBE_DEVELOPER_INSTRUCTIONS: &str = "You are a protocol probe. You must call the dynamic tool `echo_probe` exactly once with the JSON argument `{\"text\":\"PROBE_OK\"}`. Do not use shell. Do not inspect files. After the tool response is returned, reply with the exact text PROBE_OK and nothing else."; const PROBE_USER_INPUT: &str = "Call `echo_probe` with `{\\\"text\\\":\\\"PROBE_OK\\\"}`. After the tool succeeds, reply with the exact text PROBE_OK."; const PREFLIGHT_EVENT_TYPE: &str = "app-server/preflight"; +const STEER_CHANNEL_REQUEST_SCHEMA: &str = "decodex.run_control_steer_request/v1"; +const STEER_CHANNEL_RESULT_SCHEMA: &str = "decodex.run_control_steer_result/v1"; +const STEER_CONTROL_ACTION: &str = "steer"; +const STEER_REASON_DELIVERED: &str = "turn_steer_delivered"; +const STEER_REASON_TURN_MISMATCH: &str = "turn_mismatch"; +const STEER_REASON_THREAD_MISMATCH: &str = "thread_mismatch"; +const STEER_REASON_CHANNEL_IDENTITY_MISMATCH: &str = "control_channel_identity_mismatch"; +const STEER_REASON_APP_SERVER_UNSUPPORTED: &str = "app_server_turn_steer_unsupported"; +const STEER_REASON_ACTIVE_TURN_NOT_STEERABLE: &str = "active_turn_not_steerable"; +const STEER_REASON_APP_SERVER_FAILED: &str = "app_server_turn_steer_failed"; +const STEER_FAILURE_STALE_EXPECTED_TURN: &str = "stale_expected_turn_id"; +const STEER_FAILURE_ACTIVE_TURN_NOT_STEERABLE: &str = "active_turn_not_steerable"; +const STEER_FAILURE_UNSUPPORTED: &str = "app_server_turn_steer_unsupported"; +const STEER_FAILURE_APP_SERVER_FAILED: &str = "app_server_turn_steer_failed"; const PREFLIGHT_MODEL_PAGE_LIMIT: u32 = 200; const PREFLIGHT_MCP_PAGE_LIMIT: u32 = 50; const PREFLIGHT_MCP_DETAIL: &str = "toolsAndAuthOnly"; @@ -358,7 +378,7 @@ impl Display for AppServerCapabilityPreflightFailure { } } -impl Error for AppServerCapabilityPreflightFailure {} +impl std::error::Error for AppServerCapabilityPreflightFailure {} #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct AppServerDynamicToolFailure { @@ -415,7 +435,7 @@ impl Display for AppServerDynamicToolFailure { } } -impl Error for AppServerDynamicToolFailure {} +impl std::error::Error for AppServerDynamicToolFailure {} #[derive(Clone)] pub(crate) struct AppServerRunRequest<'a> { @@ -449,6 +469,78 @@ pub(crate) struct AppServerThreadArchiveRequest<'a> { pub(crate) sequence_number: i64, } +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +pub(crate) struct AppServerSteerChannelRequest { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) audit_record_id: i64, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) expected_turn_id: String, + pub(crate) source: String, + pub(crate) message: String, + pub(crate) message_byte_count: usize, + pub(crate) message_line_count: usize, + pub(crate) created_at: String, +} +impl AppServerSteerChannelRequest { + pub(crate) fn new(input: AppServerSteerChannelRequestInput) -> Self { + Self { + schema: String::from(STEER_CHANNEL_REQUEST_SCHEMA), + request_id: input.request_id, + audit_record_id: input.audit_record_id, + project_id: input.project_id, + issue_id: input.issue_id, + run_id: input.run_id, + attempt_number: input.attempt_number, + thread_id: input.thread_id, + expected_turn_id: input.expected_turn_id, + source: input.source, + message_byte_count: input.message.len(), + message_line_count: steer_message_line_count(&input.message), + message: input.message, + created_at: current_timestamp(), + } + } +} + +pub(crate) struct AppServerSteerChannelRequestInput { + pub(crate) request_id: String, + pub(crate) audit_record_id: i64, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) expected_turn_id: String, + pub(crate) source: String, + pub(crate) message: String, +} +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +pub(crate) struct AppServerSteerChannelResult { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) outcome: String, + pub(crate) reason: String, + pub(crate) failure_class: Option, + pub(crate) expected_turn_id: String, + pub(crate) current_turn_id: Option, + pub(crate) response_turn_id: Option, + pub(crate) message_byte_count: usize, + pub(crate) message_line_count: usize, + pub(crate) audit_record_id: i64, + pub(crate) recorded_at: String, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) struct AppServerSteerQueueReport { + pub(crate) request_path: PathBuf, + pub(crate) result: Option, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct CommandExecHealthCheck { pub(crate) command: Vec, @@ -968,6 +1060,62 @@ impl DynamicToolFailureDiagnostic { } } +#[derive(Clone, Debug)] +struct SteerChannelDirs { + pending: PathBuf, + processed: PathBuf, + failed: PathBuf, +} + +#[derive(Clone, Debug)] +struct SteerDelivery { + outcome: String, + reason: String, + failure_class: Option, + current_turn_id: Option, + response_turn_id: Option, +} +impl SteerDelivery { + fn completed(response_turn_id: String, current_turn_id: Option<&str>) -> Self { + Self { + outcome: RUN_CONTROL_ACTION_COMPLETED.to_owned(), + reason: STEER_REASON_DELIVERED.to_owned(), + failure_class: None, + current_turn_id: current_turn_id.map(str::to_owned), + response_turn_id: Some(response_turn_id), + } + } + + fn failed(reason: &str, failure_class: Option<&str>, current_turn_id: Option<&str>) -> Self { + Self { + outcome: RUN_CONTROL_ACTION_FAILED.to_owned(), + reason: reason.to_owned(), + failure_class: failure_class.map(str::to_owned), + current_turn_id: current_turn_id.map(str::to_owned), + response_turn_id: None, + } + } +} + +#[derive(Clone, Copy)] +struct TurnWaitContext<'a> { + state_store: &'a StateStore, + control_channel: Option<&'a RunControlChannel>, + target_thread_id: &'a str, + dynamic_tool_handler: Option<&'a dyn DynamicToolHandler>, + codex_account_provider: Option<&'a dyn CodexAccountProvider>, +} + +struct SteerDeliveryRecord<'a> { + request: &'a AppServerSteerChannelRequest, + outcome: &'a str, + reason: &'a str, + failure_class: Option<&'a str>, + response_turn_id: Option<&'a str>, + current_thread_id: Option<&'a str>, + current_turn_id: Option<&'a str>, +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum AppServerDynamicToolFailureKind { Protocol, @@ -1036,7 +1184,7 @@ pub(crate) fn execute_app_server_run( } let control_channel = publish_run_control_channel_for_request(request, state_store)?; - let result = execute_app_server_run_inner(request, state_store); + let result = execute_app_server_run_inner(request, state_store, control_channel.as_ref()); match &result { Ok(_result) => @@ -1124,6 +1272,29 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result crate::prelude::Result { + let dirs = steer_channel_dirs(channel.channel_path()); + + fs::create_dir_all(&dirs.pending)?; + fs::create_dir_all(&dirs.processed)?; + fs::create_dir_all(&dirs.failed)?; + + let file_name = steer_request_file_name(&request.request_id); + let pending_path = dirs.pending.join(&file_name); + let temp_path = dirs.pending.join(format!("{file_name}.tmp-{}", process::id())); + + fs::write(&temp_path, serde_json::to_vec_pretty(request)?)?; + fs::rename(&temp_path, &pending_path)?; + + let result = wait_for_steer_channel_result(&dirs, &file_name, wait_timeout)?; + + Ok(AppServerSteerQueueReport { request_path: pending_path, result }) +} + fn running_model_execution_protocol_activity( protocol_activity: &state::ProtocolActivitySummary, ) -> bool { @@ -1360,6 +1531,9 @@ fn protocol_activity_detail(event_type: &str, payload_value: Option<&Value>) -> string_at_paths(value, &[&["params", "status", "type"], &["status", "type"]]) }); } + if event_type == "turn/steer" { + return protocol_steer_detail(payload_value); + } if event_type.starts_with("turn/") { return protocol_turn_status_from_value(event_type, payload_value) .or_else(|| Some(String::from("running"))); @@ -1426,6 +1600,20 @@ fn protocol_activity_detail(event_type: &str, payload_value: Option<&Value>) -> None } +fn protocol_steer_detail(payload_value: Option<&Value>) -> Option { + let value = payload_value?; + let outcome = + string_at_paths(value, &[&["outcome"]]).unwrap_or_else(|| String::from("unknown")); + let failure = string_at_paths(value, &[&["failure_class"]]); + let response_turn = string_at_paths(value, &[&["response_turn_id"]]); + + Some(match (failure, response_turn) { + (Some(failure), _) => format!("{outcome}/{failure}"), + (None, Some(turn_id)) => format!("{outcome}/response_turn={turn_id}"), + (None, None) => outcome, + }) +} + fn warning_or_deprecation_detail(payload_value: Option<&Value>) -> Option { payload_value.and_then(|value| { string_at_paths( @@ -1953,6 +2141,66 @@ fn duration_seconds_i64(duration: Duration) -> i64 { i64::try_from(duration.as_secs()).unwrap_or(i64::MAX) } +fn current_timestamp() -> String { + OffsetDateTime::now_utc().format(&Rfc3339).unwrap_or_else(|_| String::from("unknown")) +} + +fn steer_message_line_count(message: &str) -> usize { + message.lines().count().max(usize::from(!message.is_empty())) +} + +fn wait_for_steer_channel_result( + dirs: &SteerChannelDirs, + file_name: &str, + wait_timeout: Duration, +) -> crate::prelude::Result> { + let started_at = Instant::now(); + let file_name = std::ffi::OsStr::new(file_name); + + while started_at.elapsed() < wait_timeout { + if let Some(path) = existing_steer_channel_result_path(dirs, file_name) { + return Ok(Some(serde_json::from_slice(&fs::read(path)?)?)); + } + + thread::sleep(STEER_RESULT_WAIT_POLL_INTERVAL); + } + + Ok(None) +} + +fn steer_channel_result_exists(dirs: &SteerChannelDirs, file_name: &std::ffi::OsStr) -> bool { + existing_steer_channel_result_path(dirs, file_name).is_some() +} + +fn existing_steer_channel_result_path( + dirs: &SteerChannelDirs, + file_name: &std::ffi::OsStr, +) -> Option { + for dir in [&dirs.processed, &dirs.failed] { + let path = dir.join(Path::new(file_name)); + + if path.exists() { + return Some(path); + } + } + + None +} + +fn steer_channel_dirs(channel_path: &Path) -> SteerChannelDirs { + let root = channel_path.with_extension("requests"); + + SteerChannelDirs { + pending: root.join("pending"), + processed: root.join("processed"), + failed: root.join("failed"), + } +} + +fn steer_request_file_name(request_id: &str) -> String { + format!("{}.json", sanitize_run_control_path_segment(request_id)) +} + fn publish_run_control_channel_for_request( request: &AppServerRunRequest<'_>, state_store: &StateStore, @@ -2217,6 +2465,7 @@ fn write_thread_marker_best_effort( fn execute_app_server_run_inner( request: &AppServerRunRequest<'_>, state_store: &StateStore, + control_channel: Option<&RunControlChannel>, ) -> crate::prelude::Result { let mut recorder = RunRecorder::new( state_store, @@ -2275,8 +2524,14 @@ fn execute_app_server_run_inner( )?; recorder.mark_activity()?; - let turn_result = - execute_turn_loop(&mut client, &mut recorder, request, state_store, &thread_id)?; + let turn_result = execute_turn_loop( + &mut client, + &mut recorder, + request, + state_store, + control_channel, + &thread_id, + )?; state_store.record_run_attempt( &request.run_id, @@ -3141,6 +3396,7 @@ fn execute_turn_loop( recorder: &mut RunRecorder<'_>, request: &AppServerRunRequest<'_>, state_store: &StateStore, + control_channel: Option<&RunControlChannel>, thread_id: &str, ) -> crate::prelude::Result { let mut next_input = request.user_input.clone(); @@ -3163,21 +3419,31 @@ fn execute_turn_loop( flush_pending_messages(client, recorder, Some(thread_id))?; - let final_output = wait_for_turn_completion( + let outcome = wait_for_turn_completion( client, recorder, - thread_id, + TurnWaitContext { + state_store, + control_channel, + target_thread_id: thread_id, + dynamic_tool_handler: request.dynamic_tool_handler, + codex_account_provider: request.codex_account_provider, + }, &turn_id, request.timeout, - request.dynamic_tool_handler, - request.codex_account_provider, - )? - .final_output; + )?; + let final_turn_id = outcome.turn_id; + let final_output = outcome.final_output; if let Some(continuation_pending) = resolve_turn_completion(request, turn_count, &final_output)? { - return Ok(TurnLoopResult { turn_id, turn_count, final_output, continuation_pending }); + return Ok(TurnLoopResult { + turn_id: final_turn_id, + turn_count, + final_output, + continuation_pending, + }); } next_input = @@ -3348,6 +3614,18 @@ fn build_turn_start_request(thread_id: &str, user_input: &str) -> TurnStartReque } } +fn build_turn_steer_request( + thread_id: &str, + expected_turn_id: &str, + message: &str, +) -> TurnSteerRequest { + TurnSteerRequest { + thread_id: thread_id.to_owned(), + expected_turn_id: expected_turn_id.to_owned(), + input: vec![UserInput::Text { text: message.to_owned() }], + } +} + fn login_account_params(account: &CodexAccountLogin) -> LoginAccountParams { LoginAccountParams::ChatgptAuthTokens { access_token: account.access_token().to_owned(), @@ -3395,29 +3673,42 @@ fn flush_pending_messages( fn wait_for_turn_completion( client: &mut AppServerClient, recorder: &mut RunRecorder<'_>, - target_thread_id: &str, - target_turn_id: &str, + context: TurnWaitContext<'_>, + initial_turn_id: &str, timeout: Duration, - dynamic_tool_handler: Option<&dyn DynamicToolHandler>, - codex_account_provider: Option<&dyn CodexAccountProvider>, ) -> crate::prelude::Result { let mut last_activity_at = Instant::now(); + let mut target_turn_id = initial_turn_id.to_owned(); let mut final_output = String::new(); let mut latest_turn_failure: Option = None; loop { + if let Some(response_turn_id) = + handle_pending_steer_channel_requests(client, recorder, context, &target_turn_id)? + && response_turn_id != target_turn_id + { + context.state_store.update_run_turn(recorder.run_id, &response_turn_id)?; + recorder.set_turn_id(&response_turn_id)?; + + target_turn_id = response_turn_id; + last_activity_at = Instant::now(); + } + let idle_timeout = protocol_activity_idle_timeout(Some(&recorder.protocol_activity.summary), timeout); let wire_message = next_turn_wire_message( client, last_activity_at, idle_timeout, - target_thread_id, - target_turn_id, + context.target_thread_id, + &target_turn_id, latest_turn_failure.as_ref(), )?; + let Some(wire_message) = wire_message else { + continue; + }; - if !targets_thread(&wire_message, Some(target_thread_id)) { + if !targets_thread(&wire_message, Some(context.target_thread_id)) { tracing::debug!(raw = %wire_message.raw, "Ignoring app-server message for another thread."); continue; @@ -3430,90 +3721,31 @@ fn wait_for_turn_completion( apply_protocol_message_side_effects(recorder, &wire_message)?; match &wire_message.message { - JsonRpcMessage::Notification(notification) => match notification.method.as_str() { - "thread/status/changed" => { - let payload: ThreadStatusChangedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.status.kind == "systemError" && latest_turn_failure.is_none() { - latest_turn_failure = - Some(AppServerTurnFailure::from_system_error(&payload.thread_id)); - } - }, - "error" => { - if let Some((failure, will_retry)) = failure_from_error_notification( - notification, - target_thread_id, - target_turn_id, - )? { - if failure.requires_operator_attention() && will_retry != Some(true) { - return Err(Report::new(failure)); - } - - latest_turn_failure = Some(failure); - } - }, - "item/agentMessage/delta" => { - let payload: AgentMessageDeltaNotification = - serde_json::from_value(notification.params.clone())?; - - final_output.push_str(&payload.delta); - }, - "item/completed" => { - let payload: ItemCompletedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.item.kind == "agentMessage" - && let Some(text) = payload.item.text - { - final_output = text; - } - }, - "turn/completed" => { - let payload: TurnCompletedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.turn.id != target_turn_id { - continue; - } - if payload.turn.status == "completed" { - return Ok(RunOutcome { final_output }); - } - - if let Some(error) = payload.turn.error.as_ref() { - return Err(Report::new(turn_failure_from_turn_error( - target_thread_id, - Some(&payload.turn.id), - &payload.turn.status, - error, - ))); - } - if let Some(failure) = latest_turn_failure { - return Err(Report::new(failure)); - } - - eyre::bail!( - "Turn `{}` ended with status `{}` without an explicit error payload.", - payload.turn.id, - payload.turn.status - ); - }, - _ => {}, + JsonRpcMessage::Notification(notification) => { + if let Some(outcome) = handle_turn_notification( + notification, + context.target_thread_id, + &target_turn_id, + &mut final_output, + &mut latest_turn_failure, + )? { + return Ok(outcome); + } }, JsonRpcMessage::Request(request) => handle_turn_execution_request( client, recorder, request, - target_thread_id, - target_turn_id, - dynamic_tool_handler, - codex_account_provider, + context.target_thread_id, + &target_turn_id, + context.dynamic_tool_handler, + context.codex_account_provider, )?, JsonRpcMessage::Response(_) => ignore_orphan_turn_json_rpc_response(), JsonRpcMessage::Error(error) => { latest_turn_failure = Some(turn_failure_from_json_rpc_error_response( - target_thread_id, - target_turn_id, + context.target_thread_id, + &target_turn_id, error, )); }, @@ -3521,6 +3753,100 @@ fn wait_for_turn_completion( } } +fn handle_turn_notification( + notification: &JsonRpcNotification, + target_thread_id: &str, + target_turn_id: &str, + final_output: &mut String, + latest_turn_failure: &mut Option, +) -> crate::prelude::Result> { + match notification.method.as_str() { + "thread/status/changed" => { + let payload: ThreadStatusChangedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.status.kind == "systemError" && latest_turn_failure.is_none() { + *latest_turn_failure = + Some(AppServerTurnFailure::from_system_error(&payload.thread_id)); + } + }, + "error" => { + if let Some((failure, will_retry)) = + failure_from_error_notification(notification, target_thread_id, target_turn_id)? + { + if failure.requires_operator_attention() && will_retry != Some(true) { + return Err(Report::new(failure)); + } + + *latest_turn_failure = Some(failure); + } + }, + "item/agentMessage/delta" => { + if !notification_targets_turn(notification, target_turn_id) { + return Ok(None); + } + + let payload: AgentMessageDeltaNotification = + serde_json::from_value(notification.params.clone())?; + + final_output.push_str(&payload.delta); + }, + "item/completed" => { + if !notification_targets_turn(notification, target_turn_id) { + return Ok(None); + } + + let payload: ItemCompletedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.item.kind == "agentMessage" + && let Some(text) = payload.item.text + { + *final_output = text; + } + }, + "turn/completed" => { + let payload: TurnCompletedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.turn.id != target_turn_id { + return Ok(None); + } + if payload.turn.status == "completed" { + return Ok(Some(RunOutcome { + final_output: final_output.clone(), + turn_id: target_turn_id.to_owned(), + })); + } + + if let Some(error) = payload.turn.error.as_ref() { + return Err(Report::new(turn_failure_from_turn_error( + target_thread_id, + Some(&payload.turn.id), + &payload.turn.status, + error, + ))); + } + if let Some(failure) = latest_turn_failure.take() { + return Err(Report::new(failure)); + } + + eyre::bail!( + "Turn `{}` ended with status `{}` without an explicit error payload.", + payload.turn.id, + payload.turn.status + ); + }, + _ => {}, + } + + Ok(None) +} + +fn notification_targets_turn(notification: &JsonRpcNotification, target_turn_id: &str) -> bool { + turn_id_from_value(¬ification.params).is_none_or(|turn_id| turn_id == target_turn_id) +} + fn next_turn_wire_message( client: &mut AppServerClient, last_activity_at: Instant, @@ -3528,13 +3854,383 @@ fn next_turn_wire_message( target_thread_id: &str, target_turn_id: &str, latest_turn_failure: Option<&AppServerTurnFailure>, -) -> crate::prelude::Result { +) -> crate::prelude::Result> { let now = Instant::now(); let wait_timeout = remaining_idle_budget(last_activity_at, now, timeout).ok_or_else(|| { turn_wait_timeout_error(target_thread_id, target_turn_id, latest_turn_failure.cloned()) })?; + let poll_timeout = wait_timeout.min(CONTROL_CHANNEL_POLL_INTERVAL); + + recv_turn_wire_message(client, poll_timeout, wait_timeout, latest_turn_failure) +} + +fn handle_pending_steer_channel_requests( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + context: TurnWaitContext<'_>, + target_turn_id: &str, +) -> crate::prelude::Result> { + let Some(channel) = context.control_channel else { + return Ok(None); + }; + let dirs = steer_channel_dirs(channel.channel_path()); + let entries = match fs::read_dir(&dirs.pending) { + Ok(entries) => entries, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None), + Err(error) => return Err(error.into()), + }; + let mut paths = Vec::new(); + + for entry in entries { + let path = entry?.path(); + + if path.extension().is_some_and(|extension| extension == "json") { + paths.push(path); + } + } + + paths.sort(); + + for path in paths { + if let Some(delivered_turn_id) = handle_steer_channel_request_file( + client, + recorder, + context, + target_turn_id, + &dirs, + &path, + )? { + return Ok(Some(delivered_turn_id)); + } + } + + Ok(None) +} + +fn handle_steer_channel_request_file( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + context: TurnWaitContext<'_>, + target_turn_id: &str, + dirs: &SteerChannelDirs, + path: &Path, +) -> crate::prelude::Result> { + let file_name = + path.file_name().ok_or_else(|| eyre::eyre!("Steer request path had no filename."))?; + + if steer_channel_result_exists(dirs, file_name) { + fs::remove_file(path)?; + + return Ok(None); + } + + let request: AppServerSteerChannelRequest = match serde_json::from_slice(&fs::read(path)?) { + Ok(request) => request, + Err(error) => { + move_malformed_steer_request(path, dirs, &error)?; + + return Ok(None); + }, + }; + let delivery = + deliver_steer_channel_request(client, recorder, context, target_turn_id, &request)?; + let response_turn_id = delivery.response_turn_id.clone(); + + write_steer_channel_result(path, dirs, &request, delivery)?; + + Ok(response_turn_id) +} + +fn deliver_steer_channel_request( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + context: TurnWaitContext<'_>, + target_turn_id: &str, + request: &AppServerSteerChannelRequest, +) -> crate::prelude::Result { + let Some(channel) = context.control_channel else { + return Ok(SteerDelivery::failed( + STEER_REASON_CHANNEL_IDENTITY_MISMATCH, + Some("run_control_action_failed"), + Some(target_turn_id), + )); + }; + let failure = + validate_steer_channel_request(channel, context.target_thread_id, target_turn_id, request); + + if let Some((reason, failure_class)) = failure { + record_steer_delivery( + recorder, + context.state_store, + channel, + SteerDeliveryRecord { + request, + outcome: RUN_CONTROL_ACTION_FAILED, + reason, + failure_class: Some(failure_class), + response_turn_id: None, + current_thread_id: Some(context.target_thread_id), + current_turn_id: Some(target_turn_id), + }, + )?; + + return Ok(SteerDelivery::failed(reason, Some(failure_class), Some(target_turn_id))); + } + + match send_turn_steer_request( + client, + recorder, + request, + context.dynamic_tool_handler, + context.codex_account_provider, + ) { + Ok(response_turn_id) => { + record_steer_delivery( + recorder, + context.state_store, + channel, + SteerDeliveryRecord { + request, + outcome: RUN_CONTROL_ACTION_COMPLETED, + reason: STEER_REASON_DELIVERED, + failure_class: None, + response_turn_id: Some(&response_turn_id), + current_thread_id: Some(context.target_thread_id), + current_turn_id: Some(target_turn_id), + }, + )?; + + Ok(SteerDelivery::completed(response_turn_id, Some(target_turn_id))) + }, + Err(error) => { + let (reason, failure_class) = classify_steer_delivery_error(&error); - recv_turn_wire_message(client, wait_timeout, latest_turn_failure) + record_steer_delivery( + recorder, + context.state_store, + channel, + SteerDeliveryRecord { + request, + outcome: RUN_CONTROL_ACTION_FAILED, + reason, + failure_class: Some(failure_class), + response_turn_id: None, + current_thread_id: Some(context.target_thread_id), + current_turn_id: Some(target_turn_id), + }, + )?; + + Ok(SteerDelivery::failed(reason, Some(failure_class), Some(target_turn_id))) + }, + } +} + +fn validate_steer_channel_request( + channel: &RunControlChannel, + target_thread_id: &str, + target_turn_id: &str, + request: &AppServerSteerChannelRequest, +) -> Option<(&'static str, &'static str)> { + if request.schema != STEER_CHANNEL_REQUEST_SCHEMA + || request.project_id != channel.project_id() + || request.issue_id != channel.issue_id() + || request.run_id != channel.run_id() + || request.attempt_number != channel.attempt_number() + { + return Some((STEER_REASON_CHANNEL_IDENTITY_MISMATCH, "run_control_action_failed")); + } + if request.thread_id != target_thread_id { + return Some((STEER_REASON_THREAD_MISMATCH, "run_control_action_failed")); + } + if request.expected_turn_id != target_turn_id { + return Some((STEER_REASON_TURN_MISMATCH, STEER_FAILURE_STALE_EXPECTED_TURN)); + } + + None +} + +fn send_turn_steer_request( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + request: &AppServerSteerChannelRequest, + dynamic_tool_handler: Option<&dyn DynamicToolHandler>, + codex_account_provider: Option<&dyn CodexAccountProvider>, +) -> crate::prelude::Result { + let response = client.steer_turn_with_handler( + build_turn_steer_request(&request.thread_id, &request.expected_turn_id, &request.message), + |connection, wire_message, server_request| { + handle_server_request_while_waiting( + connection, + recorder, + wire_message, + server_request, + RequestDispatchContext::new( + RequestWaitPhase::TurnExecution, + dynamic_tool_handler, + codex_account_provider, + Some(&request.thread_id), + None, + ), + ) + }, + )?; + + Ok(response.turn_id) +} + +fn record_steer_delivery( + recorder: &mut RunRecorder<'_>, + state_store: &StateStore, + channel: &RunControlChannel, + delivery: SteerDeliveryRecord<'_>, +) -> crate::prelude::Result<()> { + let metadata = steer_delivery_metadata( + delivery.request, + delivery.outcome, + delivery.reason, + delivery.failure_class, + delivery.response_turn_id, + ); + + state_store.record_run_control_action_delivery_outcome(RunControlActionOutcomeRequest { + project_id: channel.project_id(), + issue_id: channel.issue_id(), + run_id: channel.run_id(), + attempt_number: channel.attempt_number(), + thread_id: Some(&delivery.request.thread_id), + turn_id: Some(&delivery.request.expected_turn_id), + current_thread_id: delivery.current_thread_id, + current_turn_id: delivery.current_turn_id, + source: &delivery.request.source, + action: STEER_CONTROL_ACTION, + outcome: delivery.outcome, + reason: delivery.reason, + parent_record_id: Some(delivery.request.audit_record_id), + timeout_ms: None, + metadata: Some(&metadata), + channel: Some(channel), + })?; + recorder.record("turn/steer", &metadata.to_string())?; + + Ok(()) +} + +fn steer_delivery_metadata( + request: &AppServerSteerChannelRequest, + outcome: &str, + reason: &str, + failure_class: Option<&str>, + response_turn_id: Option<&str>, +) -> Value { + serde_json::json!({ + "request_id": request.request_id, + "audit_record_id": request.audit_record_id, + "outcome": outcome, + "reason": reason, + "failure_class": failure_class, + "expected_turn_id": request.expected_turn_id, + "response_turn_id": response_turn_id, + "message_byte_count": request.message_byte_count, + "message_line_count": request.message_line_count, + }) +} + +fn write_steer_channel_result( + request_path: &Path, + dirs: &SteerChannelDirs, + request: &AppServerSteerChannelRequest, + delivery: SteerDelivery, +) -> crate::prelude::Result<()> { + let file_name = request_path + .file_name() + .ok_or_else(|| eyre::eyre!("Steer request path had no filename."))?; + let target_dir = if delivery.outcome == RUN_CONTROL_ACTION_COMPLETED { + &dirs.processed + } else { + &dirs.failed + }; + let result_path = target_dir.join(file_name); + let result = AppServerSteerChannelResult { + schema: String::from(STEER_CHANNEL_RESULT_SCHEMA), + request_id: request.request_id.clone(), + outcome: delivery.outcome, + reason: delivery.reason, + failure_class: delivery.failure_class, + expected_turn_id: request.expected_turn_id.clone(), + current_turn_id: delivery.current_turn_id, + response_turn_id: delivery.response_turn_id, + message_byte_count: request.message_byte_count, + message_line_count: request.message_line_count, + audit_record_id: request.audit_record_id, + recorded_at: current_timestamp(), + }; + + write_json_file_atomically(&result_path, &result)?; + + fs::remove_file(request_path)?; + + Ok(()) +} + +fn move_malformed_steer_request( + request_path: &Path, + dirs: &SteerChannelDirs, + error: &serde_json::Error, +) -> crate::prelude::Result<()> { + let Some(file_name) = request_path.file_name() else { + return Ok(()); + }; + let result_path = dirs.failed.join(file_name); + let result = serde_json::json!({ + "schema": STEER_CHANNEL_RESULT_SCHEMA, + "request_id": file_name.to_string_lossy(), + "outcome": RUN_CONTROL_ACTION_FAILED, + "reason": "malformed_steer_request", + "failure_class": "run_control_action_failed", + "error": error.to_string(), + "recorded_at": current_timestamp(), + }); + + write_json_file_atomically(&result_path, &result)?; + + fs::remove_file(request_path)?; + + Ok(()) +} + +fn write_json_file_atomically(path: &Path, value: &T) -> crate::prelude::Result<()> +where + T: Serialize, +{ + let parent = + path.parent().ok_or_else(|| eyre::eyre!("JSON output path had no parent directory."))?; + let file_name = + path.file_name().ok_or_else(|| eyre::eyre!("JSON output path had no filename."))?; + let temp_path = parent.join(format!( + "{}.tmp-{}-{}", + file_name.to_string_lossy(), + process::id(), + OffsetDateTime::now_utc().unix_timestamp_nanos() + )); + + fs::create_dir_all(parent)?; + fs::write(&temp_path, serde_json::to_vec_pretty(value)?)?; + fs::rename(&temp_path, path)?; + + Ok(()) +} + +fn classify_steer_delivery_error(error: &Report) -> (&'static str, &'static str) { + let error = error.to_string().to_lowercase(); + + if error.contains("activeturnnotsteerable") { + return (STEER_REASON_ACTIVE_TURN_NOT_STEERABLE, STEER_FAILURE_ACTIVE_TURN_NOT_STEERABLE); + } + if error.contains("method not found") || error.contains("-32601") || error.contains("-32_601") { + return (STEER_REASON_APP_SERVER_UNSUPPORTED, STEER_FAILURE_UNSUPPORTED); + } + + (STEER_REASON_APP_SERVER_FAILED, STEER_FAILURE_APP_SERVER_FAILED) } fn handle_turn_execution_request( @@ -3584,17 +4280,22 @@ fn turn_wait_timeout_error( fn recv_turn_wire_message( client: &mut AppServerClient, - wait_timeout: Duration, + poll_timeout: Duration, + remaining_timeout: Duration, latest_turn_failure: Option<&AppServerTurnFailure>, -) -> crate::prelude::Result { - match client.recv(Some(wait_timeout)) { - Ok(wire_message) => Ok(wire_message), +) -> crate::prelude::Result> { + match client.recv(Some(poll_timeout)) { + Ok(wire_message) => Ok(Some(wire_message)), Err(error) => { - if error.downcast_ref::().is_some() - && let Some(failure) = latest_turn_failure - { - return Err(Report::new(failure.clone()) - .wrap_err("Timed out while waiting for additional app-server output.")); + if error.downcast_ref::().is_some() { + if remaining_timeout > poll_timeout { + return Ok(None); + } + + if let Some(failure) = latest_turn_failure { + return Err(Report::new(failure.clone()) + .wrap_err("Timed out while waiting for additional app-server output.")); + } } Err(error) @@ -3873,9 +4574,6 @@ fn dispatch_dynamic_tool_call( request: &JsonRpcRequest, context: RequestDispatchContext<'_>, ) -> crate::prelude::Result<()> { - let target_turn_id = context.target_turn_id.ok_or_else(|| { - eyre::eyre!("app_server_protocol_failure: turn execution request missing turn context") - })?; let target_thread_id = context.target_thread_id.ok_or_else(|| { eyre::eyre!("app_server_protocol_failure: turn execution request missing thread context") })?; @@ -3883,7 +4581,7 @@ fn dispatch_dynamic_tool_call( context.dynamic_tool_handler, request, target_thread_id, - target_turn_id, + context.target_turn_id, ); respond_to_dynamic_tool_call_dispatch(connection, recorder, request, dispatch) @@ -4106,7 +4804,7 @@ fn handle_dynamic_tool_call( dynamic_tool_handler: Option<&dyn DynamicToolHandler>, request: &JsonRpcRequest, target_thread_id: &str, - target_turn_id: &str, + target_turn_id: Option<&str>, ) -> DynamicToolCallDispatch { let payload = match validated_dynamic_tool_call_payload(request, target_thread_id, target_turn_id) { @@ -4171,7 +4869,7 @@ fn handle_dynamic_tool_call( fn validated_dynamic_tool_call_payload( request: &JsonRpcRequest, target_thread_id: &str, - target_turn_id: &str, + target_turn_id: Option<&str>, ) -> std::result::Result> { let payload = serde_json::from_value::(request.params.clone()).map_err( |error| { @@ -4222,7 +4920,10 @@ fn validated_dynamic_tool_call_payload( ), ))); } - if payload.turn_id != target_turn_id { + + if let Some(target_turn_id) = target_turn_id + && payload.turn_id != target_turn_id + { return Err(Box::new(DynamicToolCallDispatch::protocol_failure( Some(payload.tool), payload.namespace, diff --git a/apps/decodex/src/agent/app_server/protocol.rs b/apps/decodex/src/agent/app_server/protocol.rs index 6b0d79af..3a0a1cbb 100644 --- a/apps/decodex/src/agent/app_server/protocol.rs +++ b/apps/decodex/src/agent/app_server/protocol.rs @@ -181,6 +181,21 @@ impl AppServerClient { self.connection.request_with_handler("turn/start", ¶ms, REQUEST_TIMEOUT, handler) } + pub(super) fn steer_turn_with_handler( + &mut self, + params: TurnSteerRequest, + handler: H, + ) -> crate::prelude::Result + where + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, + { + self.connection.request_with_handler("turn/steer", ¶ms, REQUEST_TIMEOUT, handler) + } + pub(super) fn command_exec( &mut self, params: &CommandExecParams, @@ -267,6 +282,7 @@ impl AppServerClient { #[derive(Default)] pub(super) struct RunOutcome { pub(super) final_output: String, + pub(super) turn_id: String, } #[derive(Debug, Serialize)] @@ -426,6 +442,21 @@ pub(super) struct TurnStartResponse { pub(super) turn: TurnStatusPayload, } +#[derive(Debug, Serialize)] +pub(super) struct TurnSteerRequest { + #[serde(rename = "threadId")] + pub(super) thread_id: String, + #[serde(rename = "expectedTurnId")] + pub(super) expected_turn_id: String, + pub(super) input: Vec, +} + +#[derive(Debug, Deserialize)] +pub(super) struct TurnSteerResponse { + #[serde(rename = "turnId")] + pub(super) turn_id: String, +} + #[derive(Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub(super) struct CommandExecParams { diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index 138d17c7..68a3abf7 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -15,12 +15,14 @@ use crate::{ agent::{ app_server::{ AppServerCapabilityPreflightFailure, AppServerCapabilityPreflightReport, - AppServerDynamicToolFailure, AppServerRunResult, AppServerThreadArchiveRequest, - AppServerTurnFailure, CommandExecHealthCheck, CommandExecResponse, - EffectiveThreadConfig, InitializeResponse, ModelProviderCapabilitiesReadResponse, - PluginListResponse, ProbeDynamicToolHandler, REQUEST_TIMEOUT, RequestWaitPhase, - RunRecorder, RuntimeConfigSummary, SUPPORTED_CODEX_CLI_VERSION_DISPLAY_ORDER, - SkillsListResponse, TurnContinuationGuard, UserInput, + AppServerDynamicToolFailure, AppServerRunResult, AppServerSteerChannelResult, + AppServerThreadArchiveRequest, AppServerTurnFailure, CommandExecHealthCheck, + CommandExecResponse, EffectiveThreadConfig, InitializeResponse, + ModelProviderCapabilitiesReadResponse, PluginListResponse, ProbeDynamicToolHandler, + REQUEST_TIMEOUT, RequestWaitPhase, RunRecorder, RuntimeConfigSummary, + STEER_CHANNEL_RESULT_SCHEMA, STEER_REASON_DELIVERED, + SUPPORTED_CODEX_CLI_VERSION_DISPLAY_ORDER, SkillsListResponse, SteerChannelDirs, + TurnContinuationGuard, UserInput, }, json_rpc::{ AppServerHomePreflightFailure, AppServerOutputTimeout, AppServerProcessEnv, @@ -33,7 +35,7 @@ use crate::{ }, }, prelude::{Result, eyre}, - state::{self, ProtocolActivitySummary, StateStore}, + state::{self, ProtocolActivitySummary, RUN_CONTROL_ACTION_COMPLETED, StateStore}, test_support::TestEnvVarGuard, }; @@ -365,6 +367,18 @@ fn turn_start_request_uses_default_runtime_settings() { )); } +#[test] +fn turn_steer_request_uses_expected_turn_precondition_and_text() { + let request = super::build_turn_steer_request("thread-1", "turn-1", "change direction"); + + assert_eq!(request.thread_id, "thread-1"); + assert_eq!(request.expected_turn_id, "turn-1"); + assert!(matches!( + request.input.as_slice(), + [UserInput::Text{ text }] if text == "change direction" + )); +} + #[test] fn thread_start_and_resume_requests_inherit_runtime_config() { fn assert_runtime_config(value: &Value) { @@ -1327,7 +1341,11 @@ fn structured_error_notification_becomes_turn_failure() { fn json_rpc_error_response_becomes_recoverable_turn_failure() { let error = JsonRpcError { id: serde_json::json!(7), - error: JsonRpcErrorPayload { code: -32_000, message: String::from("late response") }, + error: JsonRpcErrorPayload { + code: -32_000, + message: String::from("late response"), + data: None, + }, }; let failure = super::turn_failure_from_json_rpc_error_response("thread-1", "turn-1", &error); let failure_message = failure.to_string(); @@ -1338,6 +1356,73 @@ fn json_rpc_error_response_becomes_recoverable_turn_failure() { assert!(failure_message.contains("late response")); } +#[test] +fn steer_delivery_error_classifies_active_turn_not_steerable_distinctly() { + let error = eyre::eyre!( + "`turn/steer` failed with -32000: turn is not steerable data: {{\"type\":\"activeTurnNotSteerable\"}}" + ); + let (reason, failure_class) = super::classify_steer_delivery_error(&error); + + assert_eq!(reason, super::STEER_REASON_ACTIVE_TURN_NOT_STEERABLE); + assert_eq!(failure_class, super::STEER_FAILURE_ACTIVE_TURN_NOT_STEERABLE); +} + +#[test] +fn steer_delivery_error_classifies_missing_method_as_unsupported() { + for message in [ + "`turn/steer` failed with -32601: method not found", + "`turn/steer` failed with -32601: Method not found", + ] { + let error = eyre::eyre!("{message}"); + let (reason, failure_class) = super::classify_steer_delivery_error(&error); + + assert_eq!(reason, super::STEER_REASON_APP_SERVER_UNSUPPORTED); + assert_eq!(failure_class, super::STEER_FAILURE_UNSUPPORTED); + } +} + +#[test] +fn steer_channel_result_wait_ignores_temp_file_until_atomic_result_exists() -> Result<()> { + let temp_dir = TempDir::new()?; + let dirs = SteerChannelDirs { + pending: temp_dir.path().join("pending"), + processed: temp_dir.path().join("processed"), + failed: temp_dir.path().join("failed"), + }; + let file_name = "steer-request.json"; + + fs::create_dir_all(&dirs.processed)?; + fs::write(dirs.processed.join(format!("{file_name}.tmp")), b"{")?; + + assert!( + super::wait_for_steer_channel_result(&dirs, file_name, Duration::from_millis(1))?.is_none() + ); + + let result = AppServerSteerChannelResult { + schema: String::from(STEER_CHANNEL_RESULT_SCHEMA), + request_id: String::from("request-1"), + outcome: String::from(RUN_CONTROL_ACTION_COMPLETED), + reason: String::from(STEER_REASON_DELIVERED), + failure_class: None, + expected_turn_id: String::from("turn-1"), + current_turn_id: Some(String::from("turn-1")), + response_turn_id: Some(String::from("turn-2")), + message_byte_count: 12, + message_line_count: 1, + audit_record_id: 7, + recorded_at: String::from("2026-06-02T00:00:00Z"), + }; + + super::write_json_file_atomically(&dirs.processed.join(file_name), &result)?; + + assert_eq!( + super::wait_for_steer_channel_result(&dirs, file_name, Duration::from_millis(100))?, + Some(result) + ); + + Ok(()) +} + #[test] fn thread_resume_fallback_only_allows_missing_thread_errors() { assert!(super::thread_resume_error_allows_fallback(&eyre::eyre!("thread not found"))); @@ -1392,7 +1477,7 @@ fn dynamic_tool_call_enforces_declared_namespace() { params, }; let dispatch = - super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", "turn-1"); + super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", Some("turn-1")); assert_eq!(dispatch.response.success, expected_success, "{case_name}"); assert_eq!( @@ -1435,7 +1520,8 @@ fn dynamic_tool_call_rejects_invalid_response_shape() { "turnId": "turn-1" }), }; - let dispatch = super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", "turn-1"); + let dispatch = + super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", Some("turn-1")); assert!(!dispatch.response.success); assert!(matches!( @@ -1463,7 +1549,8 @@ fn dynamic_tool_call_records_tool_failures_without_terminal_protocol_failure() { "turnId": "turn-1" }), }; - let dispatch = super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", "turn-1"); + let dispatch = + super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", Some("turn-1")); assert!(!dispatch.response.success); assert!(dispatch.terminal_failure.is_none()); @@ -1475,6 +1562,93 @@ fn dynamic_tool_call_records_tool_failures_without_terminal_protocol_failure() { assert_eq!(diagnostic.message, "tool rejected the request"); } +#[test] +fn dynamic_tool_call_can_validate_thread_without_fixed_turn_during_steer_rpc() { + let handler = NamespacedDynamicToolHandler { seen_namespace: RefCell::new(None) }; + let request = JsonRpcRequest { + id: serde_json::json!(1), + method: String::from("item/tool/call"), + params: serde_json::json!({ + "arguments": {}, + "callId": "call-1", + "namespace": "tracker", + "threadId": "thread-1", + "tool": "tracker_tool", + "turnId": "turn-after-steer" + }), + }; + let dispatch = super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", None); + + assert!(dispatch.response.success); + assert!(dispatch.terminal_failure.is_none()); + assert_eq!(*handler.seen_namespace.borrow(), Some(String::from("tracker"))); +} + +#[test] +fn turn_notification_ignores_agent_output_for_non_target_turn() { + let old_completed = JsonRpcNotification { + method: String::from("item/completed"), + params: serde_json::json!({ + "threadId": "thread-1", + "turnId": "turn-old", + "item": {"type": "agentMessage", "text": "OLD"} + }), + }; + let old_delta = JsonRpcNotification { + method: String::from("item/agentMessage/delta"), + params: serde_json::json!({ + "threadId": "thread-1", + "turnId": "turn-old", + "delta": " OLD_DELTA" + }), + }; + let target_completed = JsonRpcNotification { + method: String::from("item/completed"), + params: serde_json::json!({ + "threadId": "thread-1", + "turnId": "turn-new", + "item": {"type": "agentMessage", "text": "NEW"} + }), + }; + let mut final_output = String::from("CURRENT"); + let mut latest_turn_failure: Option = None; + + assert!( + super::handle_turn_notification( + &old_completed, + "thread-1", + "turn-new", + &mut final_output, + &mut latest_turn_failure + ) + .expect("old completed notification should parse") + .is_none() + ); + assert!( + super::handle_turn_notification( + &old_delta, + "thread-1", + "turn-new", + &mut final_output, + &mut latest_turn_failure + ) + .expect("old delta notification should parse") + .is_none() + ); + assert_eq!(final_output, "CURRENT"); + + super::handle_turn_notification( + &target_completed, + "thread-1", + "turn-new", + &mut final_output, + &mut latest_turn_failure, + ) + .expect("target completed notification should parse"); + + assert_eq!(final_output, "NEW"); +} + #[test] fn dynamic_tool_call_unavailable_outside_turn_execution_is_protocol_diagnostic() { let dispatch = super::dynamic_tool_call_unavailable_for_phase(RequestWaitPhase::TurnStart); diff --git a/apps/decodex/src/agent/json_rpc.rs b/apps/decodex/src/agent/json_rpc.rs index 5a869b96..a654d64d 100644 --- a/apps/decodex/src/agent/json_rpc.rs +++ b/apps/decodex/src/agent/json_rpc.rs @@ -372,10 +372,17 @@ impl JsonRpcConnection { return Ok(serde_json::from_value(response.result.clone())?); }, JsonRpcMessage::Error(error) if error.id == expected_id => { + let data = error + .error + .data + .as_ref() + .map_or_else(String::new, |data| format!(" data: {data}")); + return Err(eyre::eyre!( - "`{method}` failed with {}: {}", + "`{method}` failed with {}: {}{}", error.error.code, - error.error.message + error.error.message, + data )); }, JsonRpcMessage::Request(request) => handle_request(self, &wire_message, request)?, @@ -599,6 +606,8 @@ pub(crate) struct JsonRpcError { pub(crate) struct JsonRpcErrorPayload { pub(crate) code: i64, pub(crate) message: String, + + pub(crate) data: Option, } #[derive(Clone, Debug)] diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index dee43a09..997f2f0e 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -2,6 +2,7 @@ use std::{ fs, io::{self, Read as _}, path::{Path, PathBuf}, + time::Duration, }; use clap::{ @@ -15,12 +16,13 @@ use serde::{Deserialize, Serialize}; use crate::{ accounts::{self, AccountImportRequest, AccountLoginRequest, AccountUseRequest}, - agent, + agent::{self, DEFAULT_STEER_RESULT_WAIT_TIMEOUT}, archive_hygiene::{self, ArchiveHygieneRequest}, maintenance::{self, MaintenanceMode, MaintenancePruneRequest, MaintenanceScope}, manual::{self, ManualCommitRequest, ManualLandRequest}, orchestrator::{ - self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, RunOnceRequest, ServeRequest, + self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, LaneSteerReport, + LaneSteerRequest, RunOnceRequest, ServeRequest, }, prelude::{Result, eyre}, radar::{ @@ -60,6 +62,7 @@ impl Cli { Command::Commit(args) => args.run(), Command::Land(args) => args.run(), Command::Run(args) => args.run(), + Command::Lane(args) => args.run(), Command::Serve(args) => args.run(), Command::Project(args) => args.run(), Command::Status(args) => args.run(), @@ -317,6 +320,72 @@ impl RunCommand { } } +#[derive(Debug, Args)] +struct LaneCommand { + #[command(subcommand)] + command: LaneSubcommand, +} +impl LaneCommand { + fn run(&self) -> Result<()> { + match &self.command { + LaneSubcommand::Steer(args) => args.run(), + } + } +} + +#[derive(Debug, Args)] +struct LaneSteerCommand { + #[command(flatten)] + project_config: ProjectConfigArgs, + /// Issue identifier or local issue id for the active lane. + #[arg(value_name = "ISSUE")] + issue: String, + /// Run id that must own the active turn. + #[arg(long, value_name = "RUN_ID")] + run_id: String, + /// Current active app-server turn id precondition. + #[arg(long, value_name = "TURN_ID")] + expected_turn_id: String, + /// Operator-supplied steer text to send to the active turn. + #[arg(long, value_name = "TEXT")] + message: String, + /// Emit structured JSON instead of human-readable text. + #[arg(long)] + json: bool, + /// How long to wait for the active attempt to report delivery. + #[arg(long, value_name = "MILLISECONDS", default_value_t = default_lane_steer_wait_timeout_ms())] + wait_timeout_ms: u64, +} +impl LaneSteerCommand { + fn run(&self) -> Result<()> { + let report = orchestrator::steer_lane(LaneSteerRequest { + config_path: self.project_config.as_path(), + project_id: None, + issue: &self.issue, + run_id: &self.run_id, + expected_turn_id: &self.expected_turn_id, + message: &self.message, + source: "cli", + wait_timeout: Duration::from_millis(self.wait_timeout_ms), + })?; + + if self.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print!("{}", render_lane_steer_report(&report)); + } + if lane_steer_report_is_failure(&report) { + eyre::bail!( + "lane steer {}: {}", + report.outcome, + report.failure_class.as_deref().unwrap_or(&report.reason) + ); + } + + Ok(()) + } +} + #[derive(Debug, Args)] struct ServeCommand { #[command(flatten)] @@ -1188,6 +1257,8 @@ enum Command { Land(LandCommand), /// Run one orchestration pass. Run(RunCommand), + /// Inspect or influence active Decodex lanes through audited controls. + Lane(LaneCommand), /// Run the local multi-project Decodex control plane. Serve(ServeCommand), /// Manage the local Decodex project registry. @@ -1247,6 +1318,12 @@ enum ProjectSubcommand { Remove(ProjectToggleCommand), } +#[derive(Debug, Subcommand)] +enum LaneSubcommand { + /// Send operator-supplied text to an active steerable turn. + Steer(LaneSteerCommand), +} + #[derive(Debug, Subcommand)] enum RecoverSubcommand { /// Recover retained review lanes whose handoff marker is missing. @@ -1309,6 +1386,30 @@ enum RadarBundleSubcommand { Validate(RadarBundleValidateCommand), } +fn default_lane_steer_wait_timeout_ms() -> u64 { + u64::try_from(DEFAULT_STEER_RESULT_WAIT_TIMEOUT.as_millis()).unwrap_or(10_000) +} + +fn lane_steer_report_is_failure(report: &LaneSteerReport) -> bool { + matches!(report.outcome.as_str(), "rejected" | "failed" | "timed_out" | "fallback") +} + +fn render_lane_steer_report(report: &LaneSteerReport) -> String { + format!( + "lane steer {}: issue={} run_id={} attempt={} expected_turn_id={} current_turn_id={} response_turn_id={} failure_class={} audit_record_id={} delivery_status={}\n", + report.outcome, + report.issue_identifier.as_deref().unwrap_or(&report.issue_id), + report.run_id, + report.attempt_number, + report.expected_turn_id, + report.current_turn_id.as_deref().unwrap_or("none"), + report.response_turn_id.as_deref().unwrap_or("none"), + report.failure_class.as_deref().unwrap_or("none"), + report.audit_record_id, + report.delivery_status + ) +} + fn read_attempt_request(request: &str) -> Result { let raw = if request == "-" { let mut raw = String::new(); @@ -1341,16 +1442,16 @@ mod tests { use crate::cli::{ AccountCommand, AccountSubcommand, AccountUseCommand, AttemptCommand, Cli, Command, - CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, ProbeCommand, ProjectCommand, - ProjectConfigArgs, ProjectSubcommand, RadarBackfillReleaseRangeCommand, - RadarBundleBuildCommand, RadarBundleCommand, RadarBundleSubcommand, - RadarBundleValidateCommand, RadarCommand, RadarLedgerCommand, - RadarLedgerIngestExistingCommand, RadarLedgerSubcommand, RadarLedgerSummaryCommand, - RadarRefreshReleaseDeltaCommand, RadarRefreshUpstreamQueueCommand, - RadarRenderSignalCommand, RadarSubcommand, RadarValidateCommand, RecoverCommand, - RecoverSubcommand, ReviewHandoffDiagnoseCommand, ReviewHandoffRebindCommand, - ReviewHandoffRecoveryCommand, ReviewHandoffRecoverySubcommand, RunCommand, ServeCommand, - StatusCommand, + CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, LaneCommand, + LaneSteerCommand, LaneSubcommand, ProbeCommand, ProjectCommand, ProjectConfigArgs, + ProjectSubcommand, RadarBackfillReleaseRangeCommand, RadarBundleBuildCommand, + RadarBundleCommand, RadarBundleSubcommand, RadarBundleValidateCommand, RadarCommand, + RadarLedgerCommand, RadarLedgerIngestExistingCommand, RadarLedgerSubcommand, + RadarLedgerSummaryCommand, RadarRefreshReleaseDeltaCommand, + RadarRefreshUpstreamQueueCommand, RadarRenderSignalCommand, RadarSubcommand, + RadarValidateCommand, RecoverCommand, RecoverSubcommand, ReviewHandoffDiagnoseCommand, + ReviewHandoffRebindCommand, ReviewHandoffRecoveryCommand, ReviewHandoffRecoverySubcommand, + RunCommand, ServeCommand, StatusCommand, }; #[test] @@ -1485,6 +1586,44 @@ mod tests { assert!(error.to_string().contains("[ISSUE]")); } + #[test] + fn parses_lane_steer_with_expected_turn_precondition() { + let cli = Cli::parse_from([ + "decodex", + "lane", + "steer", + "--config", + "./project.toml", + "XY-704", + "--run-id", + "run-1", + "--expected-turn-id", + "turn-1", + "--message", + "adjust the current implementation", + "--json", + ]); + + assert!(matches!( + cli.command, + Command::Lane(LaneCommand { + command: LaneSubcommand::Steer(LaneSteerCommand { + project_config: ProjectConfigArgs { config: Some(config) }, + issue, + run_id, + expected_turn_id, + message, + json: true, + .. + }) + }) if config == Path::new("./project.toml") + && issue == "XY-704" + && run_id == "run-1" + && expected_turn_id == "turn-1" + && message == "adjust the current implementation" + )); + } + #[test] fn parses_serve_with_listen_address_and_project_config() { let cli = Cli::parse_from([ diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index ea992660..1c3ceae1 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -35,6 +35,8 @@ include!("orchestrator/entrypoints.rs"); include!("orchestrator/operator_http.rs"); +include!("orchestrator/lane_control.rs"); + include!("orchestrator/pull_request_review.rs"); include!("orchestrator/daemon.rs"); @@ -83,7 +85,8 @@ const OPERATOR_LIVE_ENDPOINT_PATH: &str = "/livez"; const OPERATOR_ACCOUNTS_ENDPOINT_PATH: &str = "/api/accounts"; const OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH: &str = "/api/operator-snapshot"; const OPERATOR_LINEAR_SCAN_ENDPOINT_PATH: &str = "/api/linear-scan"; -const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 8_192; +const OPERATOR_LANE_STEER_ENDPOINT_PATH: &str = "/api/lane-steer"; +const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 256 * 1_024; const OPERATOR_DASHBOARD_WS_CLIENT_MESSAGE_MAX_BYTES: usize = 64 * 1_024; const OPERATOR_STATE_HEADER_TERMINATOR: &[u8] = b"\r\n\r\n"; const OPERATOR_DASHBOARD_WS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); diff --git a/apps/decodex/src/orchestrator/lane_control.rs b/apps/decodex/src/orchestrator/lane_control.rs new file mode 100644 index 00000000..ce4c5d79 --- /dev/null +++ b/apps/decodex/src/orchestrator/lane_control.rs @@ -0,0 +1,279 @@ +use agent::{ + AppServerSteerChannelRequest, AppServerSteerChannelRequestInput, AppServerSteerQueueReport, +}; +use state::{RUN_CONTROL_ACTION_ACCEPTED, RunControlActionReceipt, RunControlActionRequest}; + +struct LaneSteerTarget { + run: ProjectRunStatus, + issue_identifier: Option, +} + +struct LaneSteerQueueReportInput<'a> { + target: &'a LaneSteerTarget, + request_id: &'a str, + audit_record_id: i64, + project_id: &'a str, + expected_turn_id: &'a str, + message_byte_count: usize, + message_line_count: usize, + queue: AppServerSteerQueueReport, +} + +pub(crate) fn steer_lane(request: LaneSteerRequest<'_>) -> Result { + validate_lane_steer_request(&request)?; + + let state_store = runtime::open_runtime_store()?; + let config_path = resolve_lane_steer_config_path(&state_store, request.config_path, request.project_id)?; + let config = ServiceConfig::from_path(&config_path)?; + + if let Some(project_id) = request.project_id + && project_id != config.service_id() + { + eyre::bail!( + "Lane steer project `{project_id}` did not match config service id `{}`.", + config.service_id() + ); + } + + runtime::register_project_config(&state_store, &config_path, true)?; + + let target = resolve_lane_steer_target(&state_store, &config, request.issue, request.run_id)?; + let request_id = new_lane_steer_request_id(); + let message_byte_count = request.message.len(); + let message_line_count = lane_steer_message_line_count(request.message); + let metadata = serde_json::json!({ + "request_id": request_id, + "expected_turn_id": request.expected_turn_id, + "message_byte_count": message_byte_count, + "message_line_count": message_line_count, + }); + let timeout_ms = i64::try_from(request.wait_timeout.as_millis()).unwrap_or(i64::MAX); + let receipt = state_store.resolve_run_control_action(RunControlActionRequest { + project_id: config.service_id(), + issue_id: target.run.issue_id(), + run_id: target.run.run_id(), + attempt_number: target.run.attempt_number(), + thread_id: target.run.thread_id(), + turn_id: Some(request.expected_turn_id), + source: request.source, + action: "steer", + timeout_ms: Some(timeout_ms), + metadata: Some(&metadata), + })?; + + if receipt.outcome() != RUN_CONTROL_ACTION_ACCEPTED { + return Ok(lane_steer_report_from_rejected_receipt( + &target, + &request_id, + &receipt, + request.expected_turn_id, + message_byte_count, + message_line_count, + )); + } + + let Some(channel) = receipt.channel() else { + eyre::bail!("Lane steer was accepted without an active control channel."); + }; + let Some(thread_id) = target.run.thread_id() else { + eyre::bail!("Lane steer was accepted before the active app-server thread id was known."); + }; + let steer_request = AppServerSteerChannelRequest::new(AppServerSteerChannelRequestInput { + request_id: request_id.clone(), + audit_record_id: receipt.audit_record_id(), + project_id: receipt.project_id().to_owned(), + issue_id: receipt.issue_id().to_owned(), + run_id: receipt.run_id().to_owned(), + attempt_number: receipt.attempt_number(), + thread_id: thread_id.to_owned(), + expected_turn_id: request.expected_turn_id.to_owned(), + source: request.source.to_owned(), + message: request.message.to_owned(), + }); + let queue = agent::enqueue_app_server_steer_request(channel, &steer_request, request.wait_timeout)?; + + Ok(lane_steer_report_from_queue_result(LaneSteerQueueReportInput { + target: &target, + request_id: &request_id, + audit_record_id: receipt.audit_record_id(), + project_id: receipt.project_id(), + expected_turn_id: request.expected_turn_id, + message_byte_count, + message_line_count, + queue, + })) +} + +fn validate_lane_steer_request(request: &LaneSteerRequest<'_>) -> Result<()> { + if request.issue.trim().is_empty() { + eyre::bail!("Lane steer issue must not be empty."); + } + if request.run_id.trim().is_empty() { + eyre::bail!("Lane steer run id must not be empty."); + } + if request.expected_turn_id.trim().is_empty() { + eyre::bail!("Lane steer expected turn id must not be empty."); + } + if request.message.trim().is_empty() { + eyre::bail!("Lane steer message must not be empty."); + } + if request.source.trim().is_empty() { + eyre::bail!("Lane steer source must not be empty."); + } + + Ok(()) +} + +fn resolve_lane_steer_config_path( + state_store: &StateStore, + config_path: Option<&Path>, + project_id: Option<&str>, +) -> Result { + if let Some(project_id) = project_id { + if let Some(config_path) = config_path { + return ServiceConfig::resolve_project_config_path(config_path); + } + + return state_store + .list_projects()? + .into_iter() + .find(|project| project.service_id() == project_id) + .map(|project| project.config_path().to_path_buf()) + .ok_or_else(|| { + eyre::eyre!( + "Decodex project `{project_id}` is not registered. Pass --config or run `decodex project add`." + ) + }); + } + + resolve_config_path(config_path, state_store)?.ok_or_else(|| { + eyre::eyre!( + "No Decodex project config found. Pass this command's --config or register one with `decodex project add `." + ) + }) +} + +fn resolve_lane_steer_target( + state_store: &StateStore, + project: &ServiceConfig, + issue: &str, + run_id: &str, +) -> Result { + let (_active_runs, recent_runs) = state_store.list_project_runs(project.service_id(), usize::MAX)?; + let matches = recent_runs + .into_iter() + .filter(|run| run.run_id() == run_id) + .filter(|run| private_evidence_run_matches_issue(project, run, issue)) + .map(|run| { + let branch_name = run.branch_name().map(str::to_owned); + let worktree_path = run + .worktree_path() + .map(|path| relative_worktree_path_for_path(project, path)); + let issue_identifier = operator_run_issue_identifier_from_fields( + run.run_id(), + branch_name.as_deref(), + worktree_path.as_deref(), + ); + + LaneSteerTarget { run, issue_identifier } + }) + .collect::>(); + + match matches.len() { + 0 => eyre::bail!( + "No local run matched issue `{issue}` and run id `{run_id}` in project `{}`.", + project.service_id() + ), + 1 => { + let mut matches = matches; + + Ok(matches.remove(0)) + }, + _ => eyre::bail!( + "Lane steer matched multiple local runs for issue `{issue}` and run id `{run_id}`." + ), + } +} + +fn lane_steer_report_from_rejected_receipt( + target: &LaneSteerTarget, + request_id: &str, + receipt: &RunControlActionReceipt, + expected_turn_id: &str, + message_byte_count: usize, + message_line_count: usize, +) -> LaneSteerReport { + LaneSteerReport { + project_id: receipt.project_id().to_owned(), + issue_id: receipt.issue_id().to_owned(), + issue_identifier: target.issue_identifier.clone(), + run_id: receipt.run_id().to_owned(), + attempt_number: receipt.attempt_number(), + thread_id: receipt.current_thread_id().map(str::to_owned), + expected_turn_id: expected_turn_id.to_owned(), + current_turn_id: receipt.current_turn_id().map(str::to_owned), + response_turn_id: None, + audit_record_id: receipt.audit_record_id(), + request_id: request_id.to_owned(), + request_path: None, + outcome: receipt.outcome().to_owned(), + reason: receipt.reason().to_owned(), + failure_class: lane_steer_failure_class_for_reason(receipt.reason()).map(str::to_owned), + delivery_status: String::from("rejected"), + message_byte_count, + message_line_count, + } +} + +fn lane_steer_report_from_queue_result(input: LaneSteerQueueReportInput<'_>) -> LaneSteerReport { + let result = input.queue.result; + + LaneSteerReport { + project_id: input.project_id.to_owned(), + issue_id: input.target.run.issue_id().to_owned(), + issue_identifier: input.target.issue_identifier.clone(), + run_id: input.target.run.run_id().to_owned(), + attempt_number: input.target.run.attempt_number(), + thread_id: input.target.run.thread_id().map(str::to_owned), + expected_turn_id: input.expected_turn_id.to_owned(), + current_turn_id: result + .as_ref() + .and_then(|result| result.current_turn_id.clone()) + .or_else(|| input.target.run.turn_id().map(str::to_owned)), + response_turn_id: result.as_ref().and_then(|result| result.response_turn_id.clone()), + audit_record_id: input.audit_record_id, + request_id: input.request_id.to_owned(), + request_path: Some(input.queue.request_path.display().to_string()), + outcome: result + .as_ref() + .map_or_else(|| RUN_CONTROL_ACTION_ACCEPTED.to_owned(), |result| result.outcome.clone()), + reason: result + .as_ref() + .map_or_else(|| String::from("queued_wait_timeout"), |result| result.reason.clone()), + failure_class: result.as_ref().and_then(|result| result.failure_class.clone()), + delivery_status: result + .as_ref() + .map_or_else(|| String::from("queued"), |_| String::from("resolved")), + message_byte_count: input.message_byte_count, + message_line_count: input.message_line_count, + } +} + +fn lane_steer_failure_class_for_reason(reason: &str) -> Option<&'static str> { + match reason { + "turn_mismatch" => Some("stale_expected_turn_id"), + "active_turn_not_steerable" => Some("active_turn_not_steerable"), + "app_server_turn_steer_unsupported" => Some("app_server_turn_steer_unsupported"), + _ => Some("run_control_action_failed"), + } +} + +fn new_lane_steer_request_id() -> String { + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + + format!("steer-{now}-{}", process::id()) +} + +fn lane_steer_message_line_count(message: &str) -> usize { + message.lines().count().max(usize::from(!message.is_empty())) +} diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 267eb2e5..bef146c2 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -1,6 +1,7 @@ use base64::engine::general_purpose::STANDARD; use base64::Engine as _; use sha1::{Digest as _, Sha1}; +use agent::DEFAULT_STEER_RESULT_WAIT_TIMEOUT; use crate::accounts; use crate::accounts::AccountUseRequest; @@ -27,6 +28,7 @@ enum OperatorRequestRoute { Live, AppSnapshot, LinearScan, + LaneSteer, AccountList { force_refresh: bool }, AccountSelect, AccountClear, @@ -132,6 +134,22 @@ struct OperatorLinearScanHttpRequest { project_id: Option, } +#[derive(Deserialize)] +struct OperatorLaneSteerHttpRequest { + #[serde(alias = "projectId")] + project_id: Option, + issue: Option, + #[serde(alias = "issueId")] + issue_id: Option, + #[serde(alias = "runId")] + run_id: String, + #[serde(alias = "expectedTurnId")] + expected_turn_id: String, + message: String, + #[serde(alias = "waitTimeoutMs")] + wait_timeout_ms: Option, +} + struct DashboardControlAck<'a> { request_id: Option<&'a str>, action: &'a str, @@ -228,6 +246,13 @@ fn handle_operator_state_endpoint_connection( return Ok(()); } + if route == OperatorRequestRoute::LaneSteer { + let response = build_operator_lane_steer_http_response(&request); + + stream.write_all(&response)?; + + return Ok(()); + } if route == OperatorRequestRoute::AppSnapshot { let response = build_operator_app_snapshot_http_response(snapshot); @@ -1226,6 +1251,9 @@ fn build_operator_state_http_response_with_control_requests( if route == OperatorRequestRoute::LinearScan { return Ok(build_operator_linear_scan_http_response(control_requests, request)); } + if route == OperatorRequestRoute::LaneSteer { + return Ok(build_operator_lane_steer_http_response(request)); + } Ok(build_operator_state_http_response_for_route(route)) } @@ -1417,6 +1445,67 @@ fn operator_linear_scan_request_project_id(request: &[u8]) -> Result Vec { + match operator_lane_steer_http_response_body(request) { + Ok(report) => { + let status = if lane_steer_report_is_rejected_or_failed(&report) { + "409 Conflict" + } else if report.delivery_status == "queued" { + "202 Accepted" + } else { + "200 OK" + }; + let body = serde_json::to_vec(&report) + .unwrap_or_else(|_| br#"{"error":"lane steer response failed"}"#.to_vec()); + + http_response_bytes(status, "application/json", &body) + }, + Err(error) => { + let body = serde_json::to_vec(&json!({ "error": error.to_string() })) + .unwrap_or_else(|_| br#"{"error":"lane steer request failed"}"#.to_vec()); + + http_response_bytes("400 Bad Request", "application/json", &body) + }, + } +} + +fn operator_lane_steer_http_response_body(request: &[u8]) -> Result { + let body = operator_http_request_body(request)?; + let request: OperatorLaneSteerHttpRequest = serde_json::from_slice(body) + .map_err(|error| eyre::eyre!("Lane steer request body was not valid JSON: {error}"))?; + let issue = request + .issue + .as_deref() + .or(request.issue_id.as_deref()) + .map(str::trim) + .filter(|issue| !issue.is_empty()) + .ok_or_else(|| eyre::eyre!("Lane steer request requires issue or issueId."))?; + let project_id = request.project_id.as_deref().map(str::trim).filter(|value| !value.is_empty()); + let wait_timeout = request + .wait_timeout_ms + .map(Duration::from_millis) + .unwrap_or(DEFAULT_STEER_RESULT_WAIT_TIMEOUT); + let report = steer_lane(LaneSteerRequest { + config_path: None, + project_id, + issue, + run_id: &request.run_id, + expected_turn_id: &request.expected_turn_id, + message: &request.message, + source: "api", + wait_timeout, + })?; + + Ok(report) +} + +fn lane_steer_report_is_rejected_or_failed(report: &LaneSteerReport) -> bool { + matches!( + report.outcome.as_str(), + "rejected" | "failed" | "timed_out" | "fallback" + ) +} + fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> Vec { match route { OperatorRequestRoute::Dashboard => { @@ -1438,6 +1527,9 @@ fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> OperatorRequestRoute::LinearScan => { http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") }, + OperatorRequestRoute::LaneSteer => { + http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") + }, OperatorRequestRoute::Live => { http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ok") }, @@ -1500,6 +1592,7 @@ fn parse_operator_state_request_route( force_refresh: operator_query_has_flag(query, "refresh"), }), ("POST", OPERATOR_LINEAR_SCAN_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LinearScan), + ("POST", OPERATOR_LANE_STEER_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneSteer), ("POST", "/api/accounts/select") => Ok(OperatorRequestRoute::AccountSelect), ("POST", "/api/accounts/clear") => Ok(OperatorRequestRoute::AccountClear), ("POST", "/api/accounts/logout") => Ok(OperatorRequestRoute::AccountLogout), @@ -1512,6 +1605,7 @@ fn parse_operator_state_request_route( | OPERATOR_LIVE_ENDPOINT_PATH | OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH | OPERATOR_LINEAR_SCAN_ENDPOINT_PATH + | OPERATOR_LANE_STEER_ENDPOINT_PATH | OPERATOR_ACCOUNTS_ENDPOINT_PATH | "/api/accounts/select" | "/api/accounts/clear" diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 0109c099..c6e3aa4b 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -51,6 +51,41 @@ pub(crate) struct EvidenceRequest<'a> { pub(crate) include_payload: bool, } +/// Active lane steer request. +pub(crate) struct LaneSteerRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) project_id: Option<&'a str>, + pub(crate) issue: &'a str, + pub(crate) run_id: &'a str, + pub(crate) expected_turn_id: &'a str, + pub(crate) message: &'a str, + pub(crate) source: &'a str, + pub(crate) wait_timeout: Duration, +} + +/// Active lane steer result without raw operator message content. +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +pub(crate) struct LaneSteerReport { + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) issue_identifier: Option, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: Option, + pub(crate) expected_turn_id: String, + pub(crate) current_turn_id: Option, + pub(crate) response_turn_id: Option, + pub(crate) audit_record_id: i64, + pub(crate) request_id: String, + pub(crate) request_path: Option, + pub(crate) outcome: String, + pub(crate) reason: String, + pub(crate) failure_class: Option, + pub(crate) delivery_status: String, + pub(crate) message_byte_count: usize, + pub(crate) message_line_count: usize, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct RunSummary { project_id: String, diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index f148a110..a54a4d16 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -141,11 +141,14 @@ pub struct RunControlActionReceipt { attempt_number: i64, thread_id: Option, turn_id: Option, + current_thread_id: Option, + current_turn_id: Option, source: String, action: String, outcome: String, reason: String, audit_record_id: i64, + metadata: Option, channel: Option, } impl RunControlActionReceipt { @@ -179,6 +182,16 @@ impl RunControlActionReceipt { self.turn_id.as_deref() } + /// Current thread identifier observed while resolving the request. + pub fn current_thread_id(&self) -> Option<&str> { + self.current_thread_id.as_deref() + } + + /// Current turn identifier observed while resolving the request. + pub fn current_turn_id(&self) -> Option<&str> { + self.current_turn_id.as_deref() + } + /// Local source that requested the action. pub fn source(&self) -> &str { &self.source @@ -204,6 +217,11 @@ impl RunControlActionReceipt { self.audit_record_id } + /// Optional compact action metadata captured with the audit event. + pub fn metadata(&self) -> Option<&Value> { + self.metadata.as_ref() + } + /// Control channel selected for an accepted request. pub fn channel(&self) -> Option<&RunControlChannel> { self.channel.as_ref() @@ -496,6 +514,46 @@ pub(crate) struct RunControlActionRequest<'a> { pub(crate) action: &'a str, /// Optional caller timeout budget in milliseconds. pub(crate) timeout_ms: Option, + /// Optional compact, non-secret action metadata to include in audit evidence. + pub(crate) metadata: Option<&'a Value>, +} + +/// Follow-up outcome for a run-control action handled after initial resolution. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) struct RunControlActionOutcomeRequest<'a> { + /// Project identifier used for local audit scoping. + pub(crate) project_id: &'a str, + /// Issue identifier used for local audit scoping. + pub(crate) issue_id: &'a str, + /// Run identifier used for local audit scoping. + pub(crate) run_id: &'a str, + /// Attempt number used for local audit scoping. + pub(crate) attempt_number: i64, + /// Requested app-server thread identifier, when known. + pub(crate) thread_id: Option<&'a str>, + /// Requested expected app-server turn identifier, when known. + pub(crate) turn_id: Option<&'a str>, + /// Current app-server thread identifier observed while handling the request. + pub(crate) current_thread_id: Option<&'a str>, + /// Current app-server turn identifier observed while handling the request. + pub(crate) current_turn_id: Option<&'a str>, + /// Local source that requested the action. + pub(crate) source: &'a str, + /// Requested control action. + pub(crate) action: &'a str, + /// Follow-up outcome. + pub(crate) outcome: &'a str, + /// Normalized outcome reason. + pub(crate) reason: &'a str, + /// Parent request-resolution audit record id, when known. + pub(crate) parent_record_id: Option, + /// Optional caller timeout budget in milliseconds. + pub(crate) timeout_ms: Option, + /// Optional compact, non-secret action metadata to include in audit evidence. + pub(crate) metadata: Option<&'a Value>, + /// Control channel that carried the request, when known. + pub(crate) channel: Option<&'a RunControlChannel>, } /// Registered repo target managed by the local Decodex control plane. diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 3d11ee5b..4c6f92f0 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -848,11 +848,14 @@ impl StateStore { attempt_number: resolution.audit_target.attempt_number, thread_id: resolution.audit_target.thread_id, turn_id: resolution.audit_target.turn_id, + current_thread_id: resolution.audit_target.current_thread_id, + current_turn_id: resolution.audit_target.current_turn_id, source: resolution.audit_target.source, action: resolution.audit_target.action, outcome: resolution.outcome, reason: resolution.reason, audit_record_id: event.record_id(), + metadata: resolution.audit_target.metadata, channel: resolution.channel, }) } @@ -875,9 +878,12 @@ impl StateStore { attempt_number: receipt.attempt_number, thread_id: receipt.thread_id.clone(), turn_id: receipt.turn_id.clone(), + current_thread_id: receipt.current_thread_id.clone(), + current_turn_id: receipt.current_turn_id.clone(), source: receipt.source.clone(), action: receipt.action.clone(), timeout_ms: None, + metadata: receipt.metadata.clone(), channel: receipt.channel.clone(), }; @@ -889,6 +895,39 @@ impl StateStore { ) } + /// Append a follow-up audit outcome for a control action handled from a channel request. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn record_run_control_action_delivery_outcome( + &self, + request: RunControlActionOutcomeRequest<'_>, + ) -> Result { + validate_run_control_action_outcome(request.outcome)?; + validate_required_run_control_field("reason", request.reason)?; + + let target = RunControlAuditTarget { + project_id: request.project_id.to_owned(), + issue_id: request.issue_id.to_owned(), + run_id: request.run_id.to_owned(), + attempt_number: request.attempt_number, + thread_id: request.thread_id.map(str::to_owned), + turn_id: request.turn_id.map(str::to_owned), + current_thread_id: request.current_thread_id.map(str::to_owned), + current_turn_id: request.current_turn_id.map(str::to_owned), + source: request.source.to_owned(), + action: request.action.to_owned(), + timeout_ms: request.timeout_ms, + metadata: request.metadata.cloned(), + channel: request.channel.cloned(), + }; + + self.append_run_control_audit_event( + &target, + request.outcome, + request.reason, + request.parent_record_id, + ) + } + #[cfg_attr(not(test), allow(dead_code))] fn append_run_control_audit_event( &self, @@ -900,12 +939,14 @@ impl StateStore { validate_run_control_action_outcome(outcome)?; let channel = target.channel.as_ref(); + let failure_class = run_control_action_failure_class(&target.action, outcome, reason); let payload = serde_json::json!({ "schema": "decodex.run_control_action/v1", "action": target.action, "source": target.source, "outcome": outcome, "reason": reason, + "failure_class": failure_class, "parent_record_id": parent_record_id, "requested": { "project_id": target.project_id, @@ -916,6 +957,11 @@ impl StateStore { "turn_id": target.turn_id, "timeout_ms": target.timeout_ms, }, + "observed": { + "thread_id": target.current_thread_id, + "turn_id": target.current_turn_id, + }, + "metadata": target.metadata, "channel": channel.map(|channel| serde_json::json!({ "transport": channel.transport(), "channel_path": channel.channel_path().display().to_string(), @@ -1950,6 +1996,9 @@ struct RunControlAuditTarget { source: String, action: String, timeout_ms: Option, + current_thread_id: Option, + current_turn_id: Option, + metadata: Option, channel: Option, } @@ -2050,9 +2099,12 @@ fn resolve_run_control_action_locked( attempt_number: attempt.attempt_number, thread_id: request.thread_id.map(str::to_owned), turn_id: request.turn_id.map(str::to_owned), + current_thread_id: attempt.thread_id.clone(), + current_turn_id: attempt.turn_id.clone(), source: request.source.to_owned(), action: request.action.to_owned(), timeout_ms: request.timeout_ms, + metadata: request.metadata.cloned(), channel: None, }; @@ -2138,9 +2190,12 @@ fn rejected_run_control_resolution( attempt_number: request.attempt_number, thread_id: request.thread_id.map(str::to_owned), turn_id: request.turn_id.map(str::to_owned), + current_thread_id: None, + current_turn_id: None, source: request.source.to_owned(), action: request.action.to_owned(), timeout_ms: request.timeout_ms, + metadata: request.metadata.cloned(), channel: None, }), outcome: RUN_CONTROL_ACTION_REJECTED.to_owned(), @@ -2227,6 +2282,33 @@ fn validate_run_control_action_outcome(outcome: &str) -> Result<()> { Ok(()) } +fn run_control_action_failure_class( + action: &str, + outcome: &str, + reason: &str, +) -> Option<&'static str> { + if !matches!( + outcome, + RUN_CONTROL_ACTION_REJECTED + | RUN_CONTROL_ACTION_FAILED + | RUN_CONTROL_ACTION_TIMED_OUT + | RUN_CONTROL_ACTION_FALLBACK + ) { + return None; + } + if action == "steer" && reason == "turn_mismatch" { + return Some("stale_expected_turn_id"); + } + if action == "steer" && reason == "active_turn_not_steerable" { + return Some("active_turn_not_steerable"); + } + if action == "steer" && reason == "app_server_turn_steer_unsupported" { + return Some("app_server_turn_steer_unsupported"); + } + + Some("run_control_action_failed") +} + fn retarget_review_orchestration_issue( records: &mut HashMap, previous_issue_id: &str, diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 450b41e7..80610a4d 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -2573,6 +2573,7 @@ fn run_control_accepts_active_attempt_and_persists_audit() { source: "test_hook", action: "noop", timeout_ms: Some(500), + metadata: None, }) .expect("control request should resolve"); @@ -2641,8 +2642,9 @@ fn run_control_rejects_stale_turn_and_run_mismatch() { thread_id: Some("thread-1"), turn_id: Some("turn-old"), source: "test_hook", - action: "noop", + action: "steer", timeout_ms: None, + metadata: None, }) .expect("stale turn should be audited"); let stale_run = store @@ -2656,13 +2658,29 @@ fn run_control_rejects_stale_turn_and_run_mismatch() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("stale run should be audited"); assert_eq!(stale_turn.outcome(), "rejected"); assert_eq!(stale_turn.reason(), "turn_mismatch"); + assert_eq!(stale_turn.current_turn_id(), Some("turn-current")); assert_eq!(stale_run.outcome(), "rejected"); assert_eq!(stale_run.reason(), "run_not_found"); + + let events = store + .list_private_execution_events("pubfi", "issue-1", "run-current", 1) + .expect("private control audit should read"); + let stale_turn_event = events + .iter() + .find(|event| event.record_id() == stale_turn.audit_record_id()) + .expect("stale turn audit event should exist"); + + assert_eq!( + stale_turn_event.payload()["failure_class"].as_str(), + Some("stale_expected_turn_id") + ); + assert_eq!(stale_turn_event.payload()["observed"]["turn_id"].as_str(), Some("turn-current")); } #[test] @@ -2695,6 +2713,7 @@ fn run_control_rejects_missing_channel_file() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("missing channel should be audited"); @@ -2731,6 +2750,7 @@ fn run_control_requires_active_run_ownership() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("missing lease should be audited"); @@ -2749,6 +2769,7 @@ fn run_control_requires_active_run_ownership() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("wrong active run should be audited"); diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index 21acf51f..62243779 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -163,14 +163,21 @@ operator-control surface for lane control, governed by accept active-lane stop/interrupt controls, project pause/resume controls, manual retry controls, or active-lane steer controls. Account-pool selection remains available because it changes the global Codex account selector, not an active lane. +Active-lane steer is available through `decodex lane steer --run-id +--expected-turn-id --message ` and `POST /api/lane-steer`. Both +surfaces require the expected active turn id, audit accepted or rejected state locally, +and keep raw steer text out of public tracker projections. `runActivity.activeRunsComplete` marks whether a payload is the complete active-run list; subscription-filtered payloads set it to `false`, so consumers must not treat a missing run in that payload as ended. Active run rows may include `control_capability` with the active attempt's project, issue, run id, attempt, current thread/turn ids, local transport, channel path, status, -and timestamps. It is local routing metadata for future CLI/API controls, not a -dashboard command surface. +and timestamps. It is local routing metadata for CLI/API controls, not a dashboard +command surface. +After a steer request is handled, active run protocol activity may show a compact +`turn/steer` entry with outcome, failure class, and response turn id. It does not +include the operator message. Snapshot `warnings` remain stable machine-readable tokens. When a warning needs operator action, snapshots may also include `warning_details` entries with the affected `project_id`, `repo_root`, reason, and next action; for example, a stale diff --git a/docs/spec/app-server.md b/docs/spec/app-server.md index 2e78dccf..0ee36f1a 100644 --- a/docs/spec/app-server.md +++ b/docs/spec/app-server.md @@ -149,9 +149,9 @@ the current normal dispatch preflight. Decodex's intended lane-control use is: - no operator-facing `thread/inject_items` feature in this rollout. If generated schema or live capability probing shows that `turn/interrupt` or -`turn/steer` is unavailable, the CLI/API control must report that control as -unsupported for the active lane instead of failing ordinary issue dispatch. The -lane-control contract and support matrix live in [`lane-control.md`](./lane-control.md). +`turn/steer` is unavailable, the CLI/API control reports that control as unsupported +for the active lane instead of failing ordinary issue dispatch. The lane-control +contract and support matrix live in [`lane-control.md`](./lane-control.md). ## Required request flow @@ -340,6 +340,15 @@ hard-limit task content categories. It should carry the operator's instruction b and leave policy constraints to Decodex audit, privacy, workflow, recovery, and agent-skill layers. +Decodex sends `turn/steer` only from the active attempt process that owns the live +app-server connection. `decodex lane steer` and `POST /api/lane-steer` first resolve +the local run-control channel, require `expectedTurnId`, audit accepted or rejected +state, and queue a local channel request. The active attempt rechecks the current turn +before sending app-server params `{ threadId, expectedTurnId, input }` and records the +returned `turnId` or a normalized failure class. App-server +`activeTurnNotSteerable` is surfaced as `active_turn_not_steerable`, distinct from +unsupported or generic app-server steer failures. + `turn/steer` must not be treated as: - a tracker mutation @@ -360,8 +369,8 @@ Method: Raw item injection is deferred as an operator feature. Decodex should not expose `thread/inject_items` through lane-control CLI/API in this rollout because raw item insertion has broader transcript-shaping semantics than the intended operator steer -contract. Use `turn/steer` for active-lane steering once the CLI/API implementation is -available. +contract. Use `turn/steer` for active-lane steering through the supported CLI/API +lane-control surface. ## `command/exec` diff --git a/docs/spec/lane-control.md b/docs/spec/lane-control.md index 495ced51..5b9358d9 100644 --- a/docs/spec/lane-control.md +++ b/docs/spec/lane-control.md @@ -21,7 +21,7 @@ The first supported operator-control surface for this rollout is CLI/API. Active UI controls are intentionally deferred. The dashboard may show local runtime state for observation, but it must not become the primary place where agents or operators author steer, retry, task replacement, or lifecycle mutations before the CLI/API contract is -implemented and audited. +settled and audited. Bottom-layer steer support must not hard-limit task content. The app-server, protocol, and runtime layer should expose steer broadly enough to pass operator-supplied @@ -37,14 +37,14 @@ agent-facing skills must guide responsible use. | Project dispatch pause | Supported for future dispatch | `decodex project disable ` and the runtime project enabled flag | Pause prevents new dispatch for the project. It must not kill or rewrite already active lanes. | | Project dispatch resume | Supported for future dispatch | `decodex project enable ` and the runtime project enabled flag | Resume re-enables future dispatch after the operator has inspected blockers, capacity, and queue state. | | Linear scan request | Supported | `POST /api/linear-scan` with optional `projectId` | Queue a scan for the next control-plane tick while respecting tracker backoff. This is an intake/status refresh request, not an execution command. | -| Run-control channel foundation | Supported foundation | Active attempts publish a local `.decodex-run-control/*` channel record, runtime SQLite `run_control_channels`, operator status `control_capability`, and private `control_action` audit events | Route any future mutation through the active attempt's project, issue, run id, attempt, thread id, current turn id, active lease, and local channel metadata. Invalid or stale requests fail closed and remain local audit evidence. | +| Run-control channel foundation | Supported foundation | Active attempts publish a local `.decodex-run-control/*` channel record, runtime SQLite `run_control_channels`, operator status `control_capability`, and private `control_action` audit events | Route active-lane mutations through the active attempt's project, issue, run id, attempt, thread id, current turn id, active lease, and local channel metadata. Invalid or stale requests fail closed and remain local audit evidence. | | Soft interrupt | Planned CLI/API control; bottom-layer method allowed | Decodex does not currently send `turn/interrupt` from its app-server client | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and must leave classification to the runtime. | | Hard interrupt fallback | Emergency fallback only | No dashboard or CLI/API lane-control path exposes hard interrupt in this rollout; runtime recovery can still classify attempts as `interrupted` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | -| Steer active lane | Planned CLI/API control; bottom-layer method must stay broad | Decodex does not currently send `turn/steer` from its app-server client | Pass operator-supplied steer text through the CLI/API when available. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | +| Steer active lane | Supported CLI/API control; bottom-layer method stays broad | `decodex lane steer --run-id --expected-turn-id --message `, `POST /api/lane-steer`, local run-control channel requests, app-server `turn/steer`, private `control_action` audit events, and protocol activity `turn/steer` summaries | Pass operator-supplied steer text through CLI/API to the current active turn. Require `expectedTurnId`; stale turn ids fail closed. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | | Retained resume/retry | Supported through runtime lifecycle | `decodex run `, retry scheduling, retained worktree recovery, and `thread/resume` for same-thread app-server continuation | Resume only when retained worktree, issue, branch, PR, and runtime evidence still prove the same lane. Treat ambiguous lineage as manual attention. | | Manual attention | Supported terminal control path | `decodex:needs-attention`, `issue_comment(kind = "manual_attention")`, and `issue_terminal_finalize(path = "manual_attention")` | Stop automation when policy requires a human decision. Explain the blocker through structured public fields and keep private evidence local. | | Task replacement | Deferred lifecycle work | No supported active-lane replacement command | Do not use steer or raw injection to replace the task silently. Treat replacement as explicit lifecycle work: pause/stop if needed, update or requeue the issue, or create a new issue/lane. | -| Raw thread item injection | Unsupported as an operator feature | No Decodex operator path for `thread/inject_items` | Do not expose raw `thread/inject_items` to operators in this rollout. Use `turn/steer` for operator steer once implemented. | +| Raw thread item injection | Unsupported as an operator feature | No Decodex operator path for `thread/inject_items` | Do not expose raw `thread/inject_items` to operators in this rollout. Use `turn/steer` through the supported CLI/API steer path. | | Active-lane UI authoring controls | Deferred | Existing dashboard views and low-level handlers are not the CLI/API-first lane-control contract | Do not add dashboard steer, retry, or task-replacement controls in this rollout. Ship CLI/API first, then promote UI controls only after audit and policy behavior is settled. | ## Inspect-First Rule @@ -69,7 +69,7 @@ Every live app-server attempt publishes a per-attempt local control capability w Decodex still owns the active lease for the run. The current mechanism is a local file channel under the run worktree's `.decodex-run-control/` directory plus a `run_control_channels` runtime SQLite row. This is foundation plumbing only: it proves -where an active attempt can receive future control traffic without exposing steer, +where an active attempt can receive control traffic without exposing steer, interrupt, or task-replacement semantics by itself. The channel row is scoped by project id, issue id, run id, attempt number, transport, @@ -118,6 +118,8 @@ requirements. Higher layers own guardrails: - CLI/API must require explicit operator-supplied steer text and a target lane identity. +- CLI/API must require the current active turn precondition as `expectedTurnId`; + stale expected turn ids fail closed and remain local audit evidence. - Decodex must audit steer requests in local runtime evidence before or alongside delivery to app-server. - Privacy and public-text guards decide what, if anything, may be mirrored to Linear. @@ -130,6 +132,20 @@ Steer is not task replacement. If the operator wants a different issue, a materi different goal, or a new acceptance contract, the correct path is lifecycle/requeue work, not a hidden active-lane content swap. +The supported CLI form is: + +```sh +decodex lane steer --run-id \ + --expected-turn-id --message +``` + +The supported API form is `POST /api/lane-steer` with JSON fields `issue` or +`issueId`, `runId`, `expectedTurnId`, `message`, and optional `projectId` and +`waitTimeoutMs`. Both surfaces resolve the active run-control channel before queueing +the request. The active run rechecks the expected turn id immediately before sending +`turn/steer`, so a turn that completed between operator inspection and delivery is not +steered accidentally. + ## Retained Resume And Retry Retained resume and retry are lifecycle controls, not prompt injection controls. @@ -177,6 +193,11 @@ accepted, rejected, completed, failed, timed out, and fallback outcomes. These r are scoped by the same project, issue, run id, and attempt tuple as other private execution evidence. They are available through `decodex evidence --run-id --attempt ` and survive independently of any public Linear projection. +Steer audit records include the request id, accepted or rejected state, +requested/observed turn ids, response turn id when app-server accepts the steer, +failure class, and compact message metadata such as byte and line counts. Default +evidence summaries redact message-like fields, and operator status surfaces +`turn/steer` protocol activity without including the raw operator text. Linear public text remains sparse. Do not write steer text, raw command output, process diagnostics, private evidence payloads, account details, or host-local paths into @@ -184,13 +205,8 @@ Linear unless a schema-controlled public projection explicitly allows it. ## Implementation Status For This Rollout -This document specifies capabilities that the CLI/API should expose first. Current code -already supports inspect, CLI project enable/disable, Linear scan requests, retained -resume/retry lifecycle paths, and manual-attention finalization. Current code does not -expose dashboard lane-mutation controls, does not yet implement Decodex CLI/API -controls that send `turn/interrupt` or `turn/steer`, and does not expose raw -`thread/inject_items` as an operator feature. - -When implementation work adds the missing CLI/API controls, update this spec, -[`app-server.md`](./app-server.md), the operator reference, and the Decodex plugin -skills in the same lane. +Current code supports inspect, CLI project enable/disable, Linear scan requests, +active-lane steer through CLI/API, retained resume/retry lifecycle paths, and +manual-attention finalization. Current code does not expose dashboard lane-mutation +controls, does not yet implement Decodex CLI/API controls that send `turn/interrupt`, +and does not expose raw `thread/inject_items` as an operator feature. diff --git a/plugins/decodex/skills/automation/SKILL.md b/plugins/decodex/skills/automation/SKILL.md index 461daac7..5999c1dc 100644 --- a/plugins/decodex/skills/automation/SKILL.md +++ b/plugins/decodex/skills/automation/SKILL.md @@ -127,7 +127,9 @@ Rules for agents: the active turn can be targeted. Use hard process interruption only as a fallback when soft interrupt is unavailable, timed out, or impossible. - Use steer only through the CLI/API lane-control surface and only when the operator - supplies the steer text. Bottom-layer steer support is broad; policy, audit, privacy, + supplies the steer text. The CLI form is `decodex lane steer --run-id + --expected-turn-id --message `; API callers use + `POST /api/lane-steer`. Bottom-layer steer support is broad; policy, audit, privacy, workflow, recovery, and skills provide the guardrails. - Treat task replacement as explicit lifecycle work, not steer. If the operator wants a different objective or acceptance contract, pause or stop if needed, update/requeue diff --git a/plugins/decodex/skills/manual-cli/SKILL.md b/plugins/decodex/skills/manual-cli/SKILL.md index 20c1add9..e2d1fb61 100644 --- a/plugins/decodex/skills/manual-cli/SKILL.md +++ b/plugins/decodex/skills/manual-cli/SKILL.md @@ -70,9 +70,12 @@ CLI/API lane controls: scheduled Linear poll. - Use `decodex run ` only for deliberate one-issue automation or retained retry/resume when the lane remains eligible under the registered workflow. -- When future CLI/API controls expose soft interrupt or steer, prefer soft interrupt - over hard process interruption and use steer only with explicit operator-supplied - text. +- Use `decodex lane steer --run-id --expected-turn-id + --message ` only with explicit operator-supplied text after inspection proves + the active lane identity. The expected turn id is a fail-closed precondition. + Broad steer text is allowed at the bottom layer; lifecycle ambiguity should route to + explicit recovery or manual attention instead of silently changing the issue + contract. - Do not use active-lane UI controls, direct runtime DB edits, raw `thread/inject_items`, or tracker-state mutations as substitutes for the lane-control contract.