diff --git a/.gitignore b/.gitignore index c692b22..26fad43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # AI .codex .decodex +.decodex-run-activity +.decodex-run-control !apps/decodex-app/.codex/ !apps/decodex-app/.codex/environments/ !apps/decodex-app/.codex/environments/environment.toml diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index 0e23b74..13945ea 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -8,7 +8,7 @@ use std::{ env, error::Error, fmt::{self, Display, Formatter}, - fs, + fs, mem, path::{Path, PathBuf}, time::{Duration, Instant}, }; @@ -31,7 +31,7 @@ use self::protocol::{ ProbeDynamicToolHandler, RunOutcome, RuntimeConfigSummary, SkillsListParams, SkillsListResponse, ThreadArchiveRequest, ThreadResumeRequest, ThreadSessionResponse, ThreadStartRequest, ThreadStatusChangedNotification, ToolRequestUserInputResponse, - TurnCompletedNotification, TurnError, TurnStartRequest, UserInput, + TurnCompletedNotification, TurnError, TurnInterruptRequest, TurnStartRequest, UserInput, }; use crate::{ agent::{ @@ -48,6 +48,9 @@ use crate::{ }, }, prelude::eyre, + run_control::{ + self, LaneControlInterruptRequest, LaneControlInterruptResponse, PendingLaneControlRequest, + }, state::{ self, CodexAccountActivitySummary, CodexAccountMarker, EffectiveRuntimeMarker, RUN_CONTROL_CHANNEL_DIR, RUN_CONTROL_CHANNEL_STATUS_COMPLETED, @@ -61,6 +64,7 @@ pub(crate) const MODEL_EXECUTION_IDLE_TIMEOUT: Duration = Duration::from_secs(30 const PROBE_TIMEOUT: Duration = Duration::from_secs(30); const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +const RUN_CONTROL_POLL_INTERVAL: Duration = Duration::from_millis(500); const MCP_PREFLIGHT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); const PLUGIN_PREFLIGHT_MAX_ATTEMPTS: u32 = 2; const PROBE_RUN_ID: &str = "protocol-probe-run"; @@ -419,6 +423,7 @@ impl Error for AppServerDynamicToolFailure {} #[derive(Clone)] pub(crate) struct AppServerRunRequest<'a> { + pub(crate) project_id: String, pub(crate) run_id: String, pub(crate) issue_id: String, pub(crate) attempt_number: i64, @@ -1092,6 +1097,7 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result, + request: &AppServerRunRequest<'_>, target_thread_id: &str, target_turn_id: &str, - timeout: Duration, - dynamic_tool_handler: Option<&dyn DynamicToolHandler>, - codex_account_provider: Option<&dyn CodexAccountProvider>, ) -> crate::prelude::Result { + let control_enabled = request.activity_marker_path.is_some(); let mut last_activity_at = Instant::now(); let mut final_output = String::new(); let mut latest_turn_failure: Option = None; loop { - let idle_timeout = - protocol_activity_idle_timeout(Some(&recorder.protocol_activity.summary), timeout); - let wire_message = next_turn_wire_message( + if control_enabled { + handle_pending_turn_control_requests( + client, + recorder, + request, + target_thread_id, + target_turn_id, + )?; + } + + let idle_timeout = protocol_activity_idle_timeout( + Some(&recorder.protocol_activity.summary), + request.timeout, + ); + let Some(wire_message) = next_turn_wire_message( client, last_activity_at, idle_timeout, target_thread_id, target_turn_id, latest_turn_failure.as_ref(), - )?; + control_enabled, + )? + else { + continue; + }; if !targets_thread(&wire_message, Some(target_thread_id)) { tracing::debug!(raw = %wire_message.raw, "Ignoring app-server message for another thread."); @@ -3430,84 +3443,25 @@ fn wait_for_turn_completion( apply_protocol_message_side_effects(recorder, &wire_message)?; match &wire_message.message { - JsonRpcMessage::Notification(notification) => match notification.method.as_str() { - "thread/status/changed" => { - let payload: ThreadStatusChangedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.status.kind == "systemError" && latest_turn_failure.is_none() { - latest_turn_failure = - Some(AppServerTurnFailure::from_system_error(&payload.thread_id)); - } - }, - "error" => { - if let Some((failure, will_retry)) = failure_from_error_notification( - notification, - target_thread_id, - target_turn_id, - )? { - if failure.requires_operator_attention() && will_retry != Some(true) { - return Err(Report::new(failure)); - } - - latest_turn_failure = Some(failure); - } - }, - "item/agentMessage/delta" => { - let payload: AgentMessageDeltaNotification = - serde_json::from_value(notification.params.clone())?; - - final_output.push_str(&payload.delta); - }, - "item/completed" => { - let payload: ItemCompletedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.item.kind == "agentMessage" - && let Some(text) = payload.item.text - { - final_output = text; - } - }, - "turn/completed" => { - let payload: TurnCompletedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.turn.id != target_turn_id { - continue; - } - if payload.turn.status == "completed" { - return Ok(RunOutcome { final_output }); - } - - if let Some(error) = payload.turn.error.as_ref() { - return Err(Report::new(turn_failure_from_turn_error( - target_thread_id, - Some(&payload.turn.id), - &payload.turn.status, - error, - ))); - } - if let Some(failure) = latest_turn_failure { - return Err(Report::new(failure)); - } - - eyre::bail!( - "Turn `{}` ended with status `{}` without an explicit error payload.", - payload.turn.id, - payload.turn.status - ); - }, - _ => {}, + JsonRpcMessage::Notification(notification) => { + if let Some(outcome) = handle_turn_execution_notification( + notification, + target_thread_id, + target_turn_id, + &mut final_output, + &mut latest_turn_failure, + )? { + return Ok(outcome); + } }, - JsonRpcMessage::Request(request) => handle_turn_execution_request( + JsonRpcMessage::Request(server_request) => handle_turn_execution_request( client, recorder, - request, + server_request, target_thread_id, target_turn_id, - dynamic_tool_handler, - codex_account_provider, + request.dynamic_tool_handler, + request.codex_account_provider, )?, JsonRpcMessage::Response(_) => ignore_orphan_turn_json_rpc_response(), JsonRpcMessage::Error(error) => { @@ -3521,6 +3475,85 @@ fn wait_for_turn_completion( } } +fn handle_turn_execution_notification( + notification: &JsonRpcNotification, + target_thread_id: &str, + target_turn_id: &str, + final_output: &mut String, + latest_turn_failure: &mut Option, +) -> crate::prelude::Result> { + match notification.method.as_str() { + "thread/status/changed" => { + let payload: ThreadStatusChangedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.status.kind == "systemError" && latest_turn_failure.is_none() { + *latest_turn_failure = + Some(AppServerTurnFailure::from_system_error(&payload.thread_id)); + } + }, + "error" => { + if let Some((failure, will_retry)) = + failure_from_error_notification(notification, target_thread_id, target_turn_id)? + { + if failure.requires_operator_attention() && will_retry != Some(true) { + return Err(Report::new(failure)); + } + + *latest_turn_failure = Some(failure); + } + }, + "item/agentMessage/delta" => { + let payload: AgentMessageDeltaNotification = + serde_json::from_value(notification.params.clone())?; + + final_output.push_str(&payload.delta); + }, + "item/completed" => { + let payload: ItemCompletedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.item.kind == "agentMessage" + && let Some(text) = payload.item.text + { + *final_output = text; + } + }, + "turn/completed" => { + let payload: TurnCompletedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.turn.id != target_turn_id { + return Ok(None); + } + if payload.turn.status == "completed" { + return Ok(Some(RunOutcome { final_output: mem::take(final_output) })); + } + + if let Some(error) = payload.turn.error.as_ref() { + return Err(Report::new(turn_failure_from_turn_error( + target_thread_id, + Some(&payload.turn.id), + &payload.turn.status, + error, + ))); + } + if let Some(failure) = latest_turn_failure.take() { + return Err(Report::new(failure)); + } + + eyre::bail!( + "Turn `{}` ended with status `{}` without an explicit error payload.", + payload.turn.id, + payload.turn.status + ); + }, + _ => {}, + } + + Ok(None) +} + fn next_turn_wire_message( client: &mut AppServerClient, last_activity_at: Instant, @@ -3528,13 +3561,265 @@ fn next_turn_wire_message( target_thread_id: &str, target_turn_id: &str, latest_turn_failure: Option<&AppServerTurnFailure>, -) -> crate::prelude::Result { + control_enabled: bool, +) -> crate::prelude::Result> { let now = Instant::now(); let wait_timeout = remaining_idle_budget(last_activity_at, now, timeout).ok_or_else(|| { turn_wait_timeout_error(target_thread_id, target_turn_id, latest_turn_failure.cloned()) })?; + let recv_timeout = + if control_enabled { wait_timeout.min(RUN_CONTROL_POLL_INTERVAL) } else { wait_timeout }; + + match recv_turn_wire_message(client, recv_timeout, latest_turn_failure) { + Ok(wire_message) => Ok(Some(wire_message)), + Err(error) + if control_enabled + && recv_timeout < wait_timeout + && is_app_server_output_timeout(&error) => + Ok(None), + Err(error) => Err(error), + } +} + +fn handle_pending_turn_control_requests( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + request: &AppServerRunRequest<'_>, + target_thread_id: &str, + target_turn_id: &str, +) -> crate::prelude::Result<()> { + let Some(worktree_path) = request.activity_marker_path.as_deref() else { + return Ok(()); + }; + + for pending in run_control::pending_interrupt_requests(worktree_path, &request.run_id)? { + handle_pending_turn_interrupt_request( + client, + recorder, + request, + worktree_path, + pending, + target_thread_id, + target_turn_id, + )?; + } + + Ok(()) +} + +fn handle_pending_turn_interrupt_request( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + run_request: &AppServerRunRequest<'_>, + worktree_path: &Path, + pending: PendingLaneControlRequest, + target_thread_id: &str, + target_turn_id: &str, +) -> crate::prelude::Result<()> { + record_lane_interrupt_request(recorder, &pending.request)?; + + if let Some((error_class, message)) = lane_interrupt_request_rejection( + run_request, + &pending.request, + target_thread_id, + target_turn_id, + ) { + let response = + LaneControlInterruptResponse::rejected(&pending.request, error_class, message); + + record_lane_interrupt_response(recorder, &response)?; + + run_control::write_interrupt_response(worktree_path, &response)?; + run_control::remove_interrupt_request(&pending.path)?; + + return Ok(()); + } + + let interrupt = TurnInterruptRequest { + thread_id: pending.request.thread_id.clone(), + turn_id: pending.request.turn_id.clone(), + }; + let result = client.interrupt_turn_with_handler( + interrupt, + |connection, wire_message, server_request| { + handle_server_request_while_waiting( + connection, + recorder, + wire_message, + server_request, + RequestDispatchContext::new( + RequestWaitPhase::TurnExecution, + run_request.dynamic_tool_handler, + run_request.codex_account_provider, + Some(target_thread_id), + Some(target_turn_id), + ), + ) + }, + ); + let response = match result { + Ok(value) => LaneControlInterruptResponse::delivered( + &pending.request, + run_control::protocol_response_summary(&value), + ), + Err(error) => LaneControlInterruptResponse::failed( + &pending.request, + soft_interrupt_error_class(&error), + format!("turn/interrupt failed with {}.", soft_interrupt_error_class(&error)), + ), + }; + + record_lane_interrupt_response(recorder, &response)?; + + run_control::write_interrupt_response(worktree_path, &response)?; + run_control::remove_interrupt_request(&pending.path)?; + + Ok(()) +} + +fn record_lane_interrupt_request( + recorder: &mut RunRecorder<'_>, + request: &LaneControlInterruptRequest, +) -> crate::prelude::Result<()> { + recorder.record( + "lane_control/interrupt/request", + &serde_json::json!({ + "requestId": request.request_id, + "projectId": request.project_id, + "issueId": request.issue_id, + "runId": request.run_id, + "attemptNumber": request.attempt_number, + "threadId": request.thread_id, + "turnId": request.turn_id, + "source": request.source, + "reason": request.reason, + }) + .to_string(), + ) +} + +fn record_lane_interrupt_response( + recorder: &mut RunRecorder<'_>, + response: &LaneControlInterruptResponse, +) -> crate::prelude::Result<()> { + recorder.record( + "lane_control/interrupt/response", + &serde_json::json!({ + "requestId": response.request_id, + "projectId": response.project_id, + "issueId": response.issue_id, + "runId": response.run_id, + "attemptNumber": response.attempt_number, + "threadId": response.thread_id, + "turnId": response.turn_id, + "status": response.status, + "classification": response.classification, + "method": response.method, + "errorClass": response.error_class, + "protocolSummary": response.protocol_summary, + }) + .to_string(), + )?; + recorder.state_store.append_private_execution_event( + &response.project_id, + &response.issue_id, + &response.run_id, + response.attempt_number, + "lane_control/interrupt", + serde_json::json!({ + "requestId": response.request_id, + "status": response.status, + "classification": response.classification, + "method": response.method, + "errorClass": response.error_class, + "protocolSummary": response.protocol_summary, + "message": response.message, + }), + )?; - recv_turn_wire_message(client, wait_timeout, latest_turn_failure) + Ok(()) +} + +fn lane_interrupt_request_rejection( + run_request: &AppServerRunRequest<'_>, + request: &LaneControlInterruptRequest, + target_thread_id: &str, + target_turn_id: &str, +) -> Option<(&'static str, String)> { + if request.project_id != run_request.project_id { + return Some(( + "project_mismatch", + format!( + "Control request targeted project `{}`, but this run belongs to `{}`.", + request.project_id, run_request.project_id + ), + )); + } + if request.issue_id != run_request.issue_id { + return Some(( + "issue_mismatch", + format!( + "Control request targeted issue `{}`, but this run belongs to `{}`.", + request.issue_id, run_request.issue_id + ), + )); + } + if request.run_id != run_request.run_id { + return Some(( + "run_mismatch", + format!( + "Control request targeted run `{}`, but this run is `{}`.", + request.run_id, run_request.run_id + ), + )); + } + if request.attempt_number != run_request.attempt_number { + return Some(( + "attempt_mismatch", + format!( + "Control request targeted attempt `{}`, but this run is attempt `{}`.", + request.attempt_number, run_request.attempt_number + ), + )); + } + if request.thread_id != target_thread_id { + return Some(( + "thread_mismatch", + format!( + "Control request targeted thread `{}`, but the active thread is `{target_thread_id}`.", + request.thread_id + ), + )); + } + if request.turn_id != target_turn_id { + return Some(( + "turn_mismatch", + format!( + "Control request targeted turn `{}`, but the active turn is `{target_turn_id}`.", + request.turn_id + ), + )); + } + + None +} + +fn soft_interrupt_error_class(error: &Report) -> &'static str { + if is_app_server_output_timeout(error) { + return "soft_interrupt_timed_out"; + } + + let error_text = error.to_string().to_ascii_lowercase(); + + if error_text.contains("-32601") || error_text.contains("method not found") { + "soft_interrupt_unsupported" + } else { + "soft_interrupt_failed" + } +} + +fn is_app_server_output_timeout(error: &Report) -> bool { + error.downcast_ref::().is_some() } fn handle_turn_execution_request( diff --git a/apps/decodex/src/agent/app_server/protocol.rs b/apps/decodex/src/agent/app_server/protocol.rs index 6b0d79a..3ce2bb7 100644 --- a/apps/decodex/src/agent/app_server/protocol.rs +++ b/apps/decodex/src/agent/app_server/protocol.rs @@ -181,6 +181,21 @@ impl AppServerClient { self.connection.request_with_handler("turn/start", ¶ms, REQUEST_TIMEOUT, handler) } + pub(super) fn interrupt_turn_with_handler( + &mut self, + params: TurnInterruptRequest, + handler: H, + ) -> crate::prelude::Result + where + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, + { + self.connection.request_with_handler("turn/interrupt", ¶ms, REQUEST_TIMEOUT, handler) + } + pub(super) fn command_exec( &mut self, params: &CommandExecParams, @@ -426,6 +441,13 @@ pub(super) struct TurnStartResponse { pub(super) turn: TurnStatusPayload, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(super) struct TurnInterruptRequest { + pub(super) thread_id: String, + pub(super) turn_id: String, +} + #[derive(Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub(super) struct CommandExecParams { diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index 138d17c..75a6399 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -427,6 +427,7 @@ fn turn_start_request_omits_execution_policy_overrides() { fn minimal_run_request<'a>() -> super::AppServerRunRequest<'a> { super::AppServerRunRequest { + project_id: String::from("test-project"), run_id: String::from("run-1"), issue_id: String::from("issue-1"), attempt_number: 1, @@ -1866,6 +1867,7 @@ fn live_app_server_resume_round_trip_updates_marker_and_state() { ); let first_result = super::execute_app_server_run( &super::AppServerRunRequest { + project_id: String::from("test-project"), run_id: String::from("live-resume-run"), issue_id: String::from("live-resume-issue"), attempt_number: 1, @@ -1912,6 +1914,7 @@ fn live_app_server_resume_round_trip_updates_marker_and_state() { StateStore::open_in_memory().expect("resumed state store should open"); let second_result = super::execute_app_server_run( &super::AppServerRunRequest { + project_id: String::from("test-project"), run_id: String::from("live-resume-run"), issue_id: String::from("live-resume-issue"), attempt_number: 1, diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index dee43a0..b9a416a 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -20,7 +20,8 @@ use crate::{ maintenance::{self, MaintenanceMode, MaintenancePruneRequest, MaintenanceScope}, manual::{self, ManualCommitRequest, ManualLandRequest}, orchestrator::{ - self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, RunOnceRequest, ServeRequest, + self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, LaneInspectRequest, + LaneInterruptRequest, RunOnceRequest, ServeRequest, }, prelude::{Result, eyre}, radar::{ @@ -62,6 +63,7 @@ impl Cli { Command::Run(args) => args.run(), Command::Serve(args) => args.run(), Command::Project(args) => args.run(), + Command::Lane(args) => args.run(), Command::Status(args) => args.run(), Command::Diagnose(args) => args.run(), Command::Evidence(args) => args.run(), @@ -419,6 +421,68 @@ struct ProjectToggleCommand { service_id: String, } +#[derive(Debug, Args)] +struct LaneCommand { + #[command(flatten)] + project_config: ProjectConfigArgs, + #[command(subcommand)] + command: LaneSubcommand, +} +impl LaneCommand { + fn run(&self) -> Result<()> { + match &self.command { + LaneSubcommand::Inspect(args) => orchestrator::print_lane_inspect(LaneInspectRequest { + config_path: self.project_config.as_path(), + issue: &args.issue, + run_id: args.run_id.as_deref(), + json: args.json, + }), + LaneSubcommand::Interrupt(args) => orchestrator::interrupt_lane(LaneInterruptRequest { + config_path: self.project_config.as_path(), + issue: &args.issue, + run_id: &args.run_id, + force: args.force, + reason: args.reason.as_deref(), + json: args.json, + source: "cli", + }) + .map(|_report| ()), + } + } +} + +#[derive(Debug, Args)] +struct LaneInspectCommand { + /// Issue identifier or local issue id to inspect. + #[arg(value_name = "ISSUE")] + issue: String, + /// Restrict inspection to one run id. + #[arg(long, value_name = "RUN_ID")] + run_id: Option, + /// Emit structured JSON instead of human-readable text. + #[arg(long)] + json: bool, +} + +#[derive(Debug, Args)] +struct LaneInterruptCommand { + /// Issue identifier or local issue id to interrupt. + #[arg(value_name = "ISSUE")] + issue: String, + /// Run id for the active app-server turn to interrupt. + #[arg(long, value_name = "RUN_ID")] + run_id: String, + /// Use hard process-kill fallback when soft interrupt is unavailable or fails. + #[arg(long)] + force: bool, + /// Operator-visible reason retained in local private evidence. + #[arg(long, value_name = "TEXT")] + reason: Option, + /// Emit structured JSON instead of human-readable text. + #[arg(long)] + json: bool, +} + #[derive(Debug, Args)] struct StatusCommand { #[command(flatten)] @@ -1192,6 +1256,8 @@ enum Command { Serve(ServeCommand), /// Manage the local Decodex project registry. Project(ProjectCommand), + /// Inspect or interrupt a local lane. + Lane(LaneCommand), /// Inspect the current local runtime state for one configured project. Status(StatusCommand), /// Write and print the agent-readable local evidence index. @@ -1247,6 +1313,14 @@ enum ProjectSubcommand { Remove(ProjectToggleCommand), } +#[derive(Debug, Subcommand)] +enum LaneSubcommand { + /// Inspect one local lane by issue identifier or tracker issue id. + Inspect(LaneInspectCommand), + /// Soft-interrupt an active app-server turn, with optional hard fallback. + Interrupt(LaneInterruptCommand), +} + #[derive(Debug, Subcommand)] enum RecoverSubcommand { /// Recover retained review lanes whose handoff marker is missing. @@ -1341,7 +1415,8 @@ mod tests { use crate::cli::{ AccountCommand, AccountSubcommand, AccountUseCommand, AttemptCommand, Cli, Command, - CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, ProbeCommand, ProjectCommand, + CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, LaneCommand, + LaneInspectCommand, LaneInterruptCommand, LaneSubcommand, ProbeCommand, ProjectCommand, ProjectConfigArgs, ProjectSubcommand, RadarBackfillReleaseRangeCommand, RadarBundleBuildCommand, RadarBundleCommand, RadarBundleSubcommand, RadarBundleValidateCommand, RadarCommand, RadarLedgerCommand, @@ -1902,6 +1977,70 @@ mod tests { )); } + #[test] + fn parses_lane_inspect_with_run_id_and_project_config() { + let cli = Cli::parse_from([ + "decodex", + "lane", + "--config", + "./project.toml", + "inspect", + "XY-703", + "--run-id", + "xy-703-attempt-1", + "--json", + ]); + + assert!(matches!( + cli.command, + Command::Lane(LaneCommand { + project_config: ProjectConfigArgs { config: Some(config) }, + command: LaneSubcommand::Inspect(LaneInspectCommand { + issue, + run_id: Some(run_id), + json: true, + }) + }) if config == Path::new("./project.toml") + && issue == "XY-703" + && run_id == "xy-703-attempt-1" + )); + } + + #[test] + fn parses_lane_interrupt_with_force_reason_and_project_config() { + let cli = Cli::parse_from([ + "decodex", + "lane", + "--config", + "./project.toml", + "interrupt", + "XY-703", + "--run-id", + "xy-703-attempt-1", + "--force", + "--reason", + "operator requested", + "--json", + ]); + + assert!(matches!( + cli.command, + Command::Lane(LaneCommand { + project_config: ProjectConfigArgs { config: Some(config) }, + command: LaneSubcommand::Interrupt(LaneInterruptCommand { + issue, + run_id, + force: true, + reason: Some(reason), + json: true, + }) + }) if config == Path::new("./project.toml") + && issue == "XY-703" + && run_id == "xy-703-attempt-1" + && reason == "operator requested" + )); + } + #[test] fn parses_diagnose_with_json_limit_and_project_config() { let cli = Cli::parse_from([ diff --git a/apps/decodex/src/lib.rs b/apps/decodex/src/lib.rs index e5c9378..1be764d 100644 --- a/apps/decodex/src/lib.rs +++ b/apps/decodex/src/lib.rs @@ -23,6 +23,7 @@ mod prelude { } mod radar; mod recovery; +mod run_control; mod runtime; mod tracker; mod worktree; diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index ea99266..c11f8cd 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -1,3 +1,9 @@ +mod lane_control; + +pub(crate) use lane_control::{ + LaneInspectRequest, LaneInterruptRequest, interrupt_lane, print_lane_inspect, +}; + #[cfg(unix)] use std::os::fd::AsRawFd; use std::{ cmp::Ordering, @@ -83,6 +89,8 @@ const OPERATOR_LIVE_ENDPOINT_PATH: &str = "/livez"; const OPERATOR_ACCOUNTS_ENDPOINT_PATH: &str = "/api/accounts"; const OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH: &str = "/api/operator-snapshot"; const OPERATOR_LINEAR_SCAN_ENDPOINT_PATH: &str = "/api/linear-scan"; +const OPERATOR_LANE_INSPECT_ENDPOINT_PATH: &str = "/api/lane/inspect"; +const OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH: &str = "/api/lane/interrupt"; const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 8_192; const OPERATOR_DASHBOARD_WS_CLIENT_MESSAGE_MAX_BYTES: usize = 64 * 1_024; const OPERATOR_STATE_HEADER_TERMINATOR: &[u8] = b"\r\n\r\n"; diff --git a/apps/decodex/src/orchestrator/execution.rs b/apps/decodex/src/orchestrator/execution.rs index f66e76b..b1ae163 100644 --- a/apps/decodex/src/orchestrator/execution.rs +++ b/apps/decodex/src/orchestrator/execution.rs @@ -549,6 +549,7 @@ where DecodexToolBridge::new(&tracker_tool_bridge, build_decodex_run_context(workflow, issue_run)); let run_result = agent::execute_app_server_run( &AppServerRunRequest { + project_id: project.service_id().to_owned(), run_id: issue_run.run_id.clone(), issue_id: issue_run.issue.id.clone(), attempt_number: issue_run.attempt_number, diff --git a/apps/decodex/src/orchestrator/lane_control.rs b/apps/decodex/src/orchestrator/lane_control.rs new file mode 100644 index 0000000..6e23a61 --- /dev/null +++ b/apps/decodex/src/orchestrator/lane_control.rs @@ -0,0 +1,829 @@ +use std::{ + collections::HashSet, + io::Error, + path::{Path, PathBuf}, + process, thread, + time::{Duration, Instant}, +}; + +use libc::{ESRCH, SIGKILL, SIGTERM, c_int, pid_t}; +use serde::Serialize; + +use crate::{ + config::ServiceConfig, + orchestrator::{ + self, ChildRunRef, DEFAULT_STATUS_RUN_LIMIT, OperatorRunStatus, OperatorStatusSnapshot, + }, + prelude::{Result, eyre}, + run_control::{ + self, LaneControlInterruptRequest, LaneControlInterruptRequestInput, + LaneControlInterruptResponse, LaneControlResponseStatus, + }, + runtime, + state::{ + RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, RUN_CONTROL_ACTION_FALLBACK, + RUN_CONTROL_ACTION_TIMED_OUT, RunControlActionReceipt, RunControlActionRequest, StateStore, + }, +}; + +#[cfg(not(test))] +const LANE_INTERRUPT_RESPONSE_WAIT: Duration = Duration::from_secs(3); +#[cfg(test)] +const LANE_INTERRUPT_RESPONSE_WAIT: Duration = Duration::from_millis(20); +const LANE_HARD_INTERRUPT_TERM_WAIT: Duration = Duration::from_secs(2); + +#[derive(Clone, Copy)] +pub(crate) struct LaneInspectRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) issue: &'a str, + pub(crate) run_id: Option<&'a str>, + pub(crate) json: bool, +} + +#[derive(Clone, Copy)] +pub(crate) struct LaneInterruptRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) issue: &'a str, + pub(crate) run_id: &'a str, + pub(crate) force: bool, + pub(crate) reason: Option<&'a str>, + pub(crate) json: bool, + pub(crate) source: &'a str, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneInspectReport { + project_id: String, + issue: String, + run_id: Option, + matched_run_count: usize, + runs: Vec, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneInterruptReport { + project_id: String, + issue: String, + issue_id: String, + issue_identifier: Option, + run_id: String, + attempt_number: i64, + force: bool, + classification: String, + soft_interrupt: LaneSoftInterruptReport, + hard_interrupt: Option, + next_action: String, +} +impl LaneInterruptReport { + pub(super) fn http_status_line(&self) -> &'static str { + if self.soft_interrupt.status == "pending" && self.hard_interrupt.is_none() { + "202 Accepted" + } else { + "200 OK" + } + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LaneRunInspect { + project_id: String, + issue_id: String, + issue_identifier: Option, + run_id: String, + attempt_number: i64, + status: String, + attempt_status: String, + phase: String, + wait_reason: Option, + current_operation: String, + active_lease: bool, + execution_liveness: String, + thread_id: Option, + turn_id: Option, + thread_status: Option, + process_id: Option, + process_alive: Option, + process_liveness_reason: Option, + last_event_type: Option, + last_event_at: Option, + event_count: i64, + worktree_path: Option, + soft_interrupt_available: bool, + hard_interrupt_available: bool, + hard_interrupt_requires_force: bool, +} +impl LaneRunInspect { + fn from_operator_run(run: &OperatorRunStatus) -> Self { + Self { + project_id: run.project_id.clone(), + issue_id: run.issue_id.clone(), + issue_identifier: run.issue_identifier.clone(), + run_id: run.run_id.clone(), + attempt_number: run.attempt_number, + status: run.status.clone(), + attempt_status: run.attempt_status.clone(), + phase: run.phase.clone(), + wait_reason: run.wait_reason.clone(), + current_operation: run.current_operation.clone(), + active_lease: run.active_lease, + execution_liveness: run.execution_liveness.clone(), + thread_id: run.thread_id.clone(), + turn_id: run.turn_id.clone(), + thread_status: run.thread_status.clone(), + process_id: run.process_id, + process_alive: run.process_alive, + process_liveness_reason: run.process_liveness_reason.clone(), + last_event_type: run.last_event_type.clone(), + last_event_at: run.last_event_at.clone(), + event_count: run.event_count, + worktree_path: run.worktree_path.clone(), + soft_interrupt_available: soft_interrupt_available_for_run(run), + hard_interrupt_available: run.process_id.is_some() && run.process_alive != Some(false), + hard_interrupt_requires_force: true, + } + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LaneSoftInterruptReport { + attempted: bool, + available: bool, + status: String, + classification: String, + method: String, + request_id: Option, + message: String, + error_class: Option, + protocol_summary: Option, + response: Option, +} +impl LaneSoftInterruptReport { + fn unavailable(error_class: &str, message: &str) -> Self { + Self { + attempted: false, + available: false, + status: String::from("unavailable"), + classification: String::from("soft_interrupt_unavailable"), + method: String::from("turn/interrupt"), + request_id: None, + message: message.to_owned(), + error_class: Some(error_class.to_owned()), + protocol_summary: None, + response: None, + } + } + + fn from_response(response: LaneControlInterruptResponse) -> Self { + let status = match &response.status { + LaneControlResponseStatus::SoftDelivered => "delivered", + LaneControlResponseStatus::SoftFailed => "failed", + LaneControlResponseStatus::Rejected => "rejected", + }; + + Self { + attempted: true, + available: true, + status: String::from(status), + classification: response.classification.clone(), + method: response.method.clone(), + request_id: Some(response.request_id.clone()), + message: response.message.clone(), + error_class: response.error_class.clone(), + protocol_summary: response.protocol_summary.clone(), + response: Some(response), + } + } + + fn from_control_rejection(receipt: &RunControlActionReceipt) -> Self { + Self { + attempted: false, + available: false, + status: String::from("rejected"), + classification: String::from("control_request_rejected"), + method: String::from("turn/interrupt"), + request_id: None, + message: format!( + "Run-control resolver rejected the interrupt request: {}.", + receipt.reason() + ), + error_class: Some(receipt.reason().to_owned()), + protocol_summary: None, + response: None, + } + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LaneHardInterruptReport { + attempted: bool, + status: String, + classification: String, + signals: Vec, + process_id: Option, + process_alive_after: Option, + message: String, + error_class: Option, +} +impl LaneHardInterruptReport { + fn unavailable(error_class: &str, message: &str) -> Self { + Self { + attempted: false, + status: String::from("unavailable"), + classification: String::from("hard_interrupt_fallback"), + signals: Vec::new(), + process_id: None, + process_alive_after: None, + message: message.to_owned(), + error_class: Some(error_class.to_owned()), + } + } +} + +pub(crate) fn print_lane_inspect(request: LaneInspectRequest<'_>) -> Result<()> { + let state_store = runtime::open_runtime_store()?; + let config = load_lane_control_project(request.config_path, &state_store)?; + let report = build_lane_inspect_report(&state_store, &config, request.issue, request.run_id)?; + + if request.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print!("{}", render_lane_inspect_report(&report)); + } + + Ok(()) +} + +pub(crate) fn interrupt_lane(request: LaneInterruptRequest<'_>) -> Result { + let state_store = runtime::open_runtime_store()?; + let config = load_lane_control_project(request.config_path, &state_store)?; + let report = interrupt_lane_with_state( + &state_store, + &config, + request.issue, + request.run_id, + request.force, + request.reason, + request.source, + )?; + + if request.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print!("{}", render_lane_interrupt_report(&report)); + } + + Ok(report) +} + +pub(super) fn build_lane_inspect_report( + state_store: &StateStore, + project: &ServiceConfig, + issue: &str, + run_id: Option<&str>, +) -> Result { + let snapshot = orchestrator::build_operator_status_snapshot( + project, + state_store, + DEFAULT_STATUS_RUN_LIMIT, + )?; + let runs = matching_lane_runs(&snapshot, issue, run_id); + + if runs.is_empty() { + eyre::bail!( + "No local lane matched issue `{}`{} in project `{}`.", + issue, + run_id.map(|id| format!(" and run `{id}`")).unwrap_or_default(), + project.service_id() + ); + } + + let runs = runs.iter().map(LaneRunInspect::from_operator_run).collect::>(); + + Ok(LaneInspectReport { + project_id: project.service_id().to_owned(), + issue: issue.to_owned(), + run_id: run_id.map(str::to_owned), + matched_run_count: runs.len(), + runs, + }) +} + +pub(super) fn interrupt_lane_with_state( + state_store: &StateStore, + project: &ServiceConfig, + issue: &str, + run_id: &str, + force: bool, + reason: Option<&str>, + source: &str, +) -> Result { + let snapshot = orchestrator::build_operator_status_snapshot( + project, + state_store, + DEFAULT_STATUS_RUN_LIMIT, + )?; + let run = select_interrupt_lane_run(&snapshot, issue, run_id)?; + let soft_interrupt = + attempt_soft_lane_interrupt(state_store, project, &run, force, reason, source)?; + let hard_interrupt = if force && soft_interrupt_allows_hard_fallback(&soft_interrupt) { + Some(attempt_hard_lane_interrupt(state_store, &run, reason)?) + } else { + None + }; + let classification = hard_interrupt + .as_ref() + .map(|hard| hard.classification.clone()) + .unwrap_or_else(|| soft_interrupt.classification.clone()); + let next_action = lane_interrupt_next_action(&soft_interrupt, hard_interrupt.as_ref(), force); + + Ok(LaneInterruptReport { + project_id: project.service_id().to_owned(), + issue: issue.to_owned(), + issue_id: run.issue_id.clone(), + issue_identifier: run.issue_identifier.clone(), + run_id: run.run_id.clone(), + attempt_number: run.attempt_number, + force, + classification, + soft_interrupt, + hard_interrupt, + next_action, + }) +} + +fn soft_interrupt_allows_hard_fallback(soft: &LaneSoftInterruptReport) -> bool { + matches!(soft.status.as_str(), "pending" | "failed" | "unavailable") + && soft.error_class.as_deref() != Some("lane_not_active") +} + +fn load_lane_control_project( + config_path: Option<&Path>, + state_store: &StateStore, +) -> Result { + let Some(config_path) = orchestrator::resolve_config_path(config_path, state_store)? else { + eyre::bail!( + "No Decodex project config found. Pass --config or register one with `decodex project add `." + ); + }; + + runtime::register_project_config(state_store, &config_path, true)?; + + ServiceConfig::from_path(&config_path) +} + +fn select_interrupt_lane_run( + snapshot: &OperatorStatusSnapshot, + issue: &str, + run_id: &str, +) -> Result { + let runs = matching_lane_runs(snapshot, issue, Some(run_id)); + + if runs.is_empty() { + eyre::bail!( + "No local lane matched issue `{issue}` and run `{run_id}` in project `{}`.", + snapshot.project_id + ); + } + + Ok(runs[0].clone()) +} + +fn matching_lane_runs( + snapshot: &OperatorStatusSnapshot, + issue: &str, + run_id: Option<&str>, +) -> Vec { + let mut seen_run_ids = HashSet::new(); + let mut runs = Vec::new(); + + for run in snapshot.active_runs.iter().chain(snapshot.recent_runs.iter()) { + if !seen_run_ids.insert(run.run_id.clone()) { + continue; + } + if !lane_issue_matches(run, issue) { + continue; + } + if run_id.is_some_and(|expected| expected != run.run_id) { + continue; + } + + runs.push(run.clone()); + } + + runs +} + +fn lane_issue_matches(run: &OperatorRunStatus, issue: &str) -> bool { + let issue = issue.trim(); + + run.issue_id == issue + || run.issue_identifier.as_deref() == Some(issue) + || run + .issue_identifier + .as_ref() + .is_some_and(|identifier| identifier.eq_ignore_ascii_case(issue)) +} + +fn soft_interrupt_available_for_run(run: &OperatorRunStatus) -> bool { + orchestrator::operator_run_counts_as_active(run) + && run.worktree_path.is_some() + && run.thread_id.is_some() + && run.turn_id.is_some() + && run.control_capability.as_ref().is_some_and(|capability| capability.status == "active") +} + +fn attempt_soft_lane_interrupt( + state_store: &StateStore, + project: &ServiceConfig, + run: &OperatorRunStatus, + force: bool, + reason: Option<&str>, + source: &str, +) -> Result { + let Some(worktree_path) = absolute_lane_worktree_path(project, state_store, run)? else { + return Ok(LaneSoftInterruptReport::unavailable( + "worktree_missing", + "Soft interrupt requires the active lane worktree and run-control directory.", + )); + }; + let Some(thread_id) = run.thread_id.as_deref() else { + return Ok(LaneSoftInterruptReport::unavailable( + "thread_id_missing", + "Soft interrupt requires a recorded app-server thread id.", + )); + }; + let Some(turn_id) = run.turn_id.as_deref() else { + return Ok(LaneSoftInterruptReport::unavailable( + "turn_id_missing", + "Soft interrupt requires a recorded active app-server turn id.", + )); + }; + + if !orchestrator::operator_run_counts_as_active(run) { + return Ok(LaneSoftInterruptReport::unavailable( + "lane_not_active", + "Soft interrupt only targets active or live local lane runs.", + )); + } + + let receipt = state_store.resolve_run_control_action(RunControlActionRequest { + project_id: project.service_id(), + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + thread_id: Some(thread_id), + turn_id: Some(turn_id), + source, + action: "interrupt", + timeout_ms: Some( + i64::try_from(LANE_INTERRUPT_RESPONSE_WAIT.as_millis()).unwrap_or(i64::MAX), + ), + })?; + + if receipt.outcome() != "accepted" { + return Ok(LaneSoftInterruptReport::from_control_rejection(&receipt)); + } + + let request = LaneControlInterruptRequest::new(LaneControlInterruptRequestInput { + project_id: project.service_id(), + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + thread_id, + turn_id, + source, + reason, + }); + + run_control::write_interrupt_request(&worktree_path, &request)?; + + state_store.append_private_execution_event( + project.service_id(), + &run.issue_id, + &run.run_id, + run.attempt_number, + "lane_control/interrupt/requested", + serde_json::json!({ + "requestId": request.request_id, + "source": source, + "method": "turn/interrupt", + "force": force, + "reason": reason, + }), + )?; + + match run_control::wait_for_interrupt_response( + &worktree_path, + &run.run_id, + &request.request_id, + LANE_INTERRUPT_RESPONSE_WAIT, + )? { + Some(response) => { + let outcome = match &response.status { + LaneControlResponseStatus::SoftDelivered => RUN_CONTROL_ACTION_COMPLETED, + LaneControlResponseStatus::SoftFailed => RUN_CONTROL_ACTION_FAILED, + LaneControlResponseStatus::Rejected => RUN_CONTROL_ACTION_FAILED, + }; + + state_store.record_run_control_action_outcome( + &receipt, + outcome, + &response.classification, + )?; + + Ok(LaneSoftInterruptReport::from_response(response)) + }, + None => { + state_store.record_run_control_action_outcome( + &receipt, + RUN_CONTROL_ACTION_TIMED_OUT, + "soft_interrupt_response_pending", + )?; + + Ok(LaneSoftInterruptReport { + attempted: true, + available: true, + status: String::from("pending"), + classification: String::from("soft_interrupt_pending"), + method: String::from("turn/interrupt"), + request_id: Some(request.request_id), + message: String::from( + "Soft interrupt request was written, but the app-server child has not recorded a response yet.", + ), + error_class: Some(String::from("soft_interrupt_response_pending")), + protocol_summary: None, + response: None, + }) + }, + } +} + +fn attempt_hard_lane_interrupt( + state_store: &StateStore, + run: &OperatorRunStatus, + reason: Option<&str>, +) -> Result { + let Some(process_id) = run.process_id else { + return Ok(LaneHardInterruptReport::unavailable( + "process_id_missing", + "Hard interrupt fallback requires a recorded child process id.", + )); + }; + + if process_id == process::id() { + eyre::bail!("Refusing to hard-interrupt the current Decodex process."); + } + + let mut signals = Vec::new(); + let mut sent_any_signal = false; + + if send_lane_signal(process_id, SIGTERM)? { + sent_any_signal = true; + + signals.push(String::from("SIGTERM")); + } + if sent_any_signal + && !wait_for_lane_process_exit(process_id, LANE_HARD_INTERRUPT_TERM_WAIT) + && send_lane_signal(process_id, SIGKILL)? + { + signals.push(String::from("SIGKILL")); + } + + let process_alive_after = Some(orchestrator::process_is_alive(process_id)); + let status = if process_alive_after == Some(false) { + "sent" + } else if sent_any_signal { + "still_alive" + } else { + "process_not_found" + }; + let message = if sent_any_signal { + String::from("Hard interrupt fallback signaled the recorded child process.") + } else { + String::from("Hard interrupt fallback found no signalable child process.") + }; + + state_store.append_private_execution_event( + &run.project_id, + &run.issue_id, + &run.run_id, + run.attempt_number, + "lane_control/interrupt", + serde_json::json!({ + "classification": "hard_interrupt_fallback", + "status": status, + "signals": signals, + "processId": process_id, + "processAliveAfter": process_alive_after, + "reason": reason, + }), + )?; + + record_hard_interrupt_control_fallback(state_store, run, reason)?; + + if sent_any_signal { + orchestrator::clear_orphaned_daemon_child_state( + state_store, + ChildRunRef { + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + }, + true, + )?; + } + + Ok(LaneHardInterruptReport { + attempted: true, + status: String::from(status), + classification: String::from("hard_interrupt_fallback"), + signals, + process_id: Some(process_id), + process_alive_after, + message, + error_class: None, + }) +} + +fn record_hard_interrupt_control_fallback( + state_store: &StateStore, + run: &OperatorRunStatus, + _reason: Option<&str>, +) -> Result<()> { + let receipt = state_store.resolve_run_control_action(RunControlActionRequest { + project_id: &run.project_id, + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + thread_id: run.thread_id.as_deref(), + turn_id: run.turn_id.as_deref(), + source: "hard_interrupt_fallback", + action: "interrupt", + timeout_ms: None, + })?; + + state_store.record_run_control_action_outcome( + &receipt, + RUN_CONTROL_ACTION_FALLBACK, + "hard_interrupt_fallback", + )?; + + Ok(()) +} + +fn absolute_lane_worktree_path( + project: &ServiceConfig, + state_store: &StateStore, + run: &OperatorRunStatus, +) -> Result> { + if let Some(mapping) = state_store.worktree_for_issue(&run.issue_id)? { + return Ok(Some(mapping.worktree_path().to_path_buf())); + } + + let Some(worktree_path) = run.worktree_path.as_deref() else { + return Ok(None); + }; + let worktree_path = Path::new(worktree_path); + + Ok(Some(if worktree_path.is_absolute() { + worktree_path.to_path_buf() + } else { + project.repo_root().join(worktree_path) + })) +} + +fn send_lane_signal(process_id: u32, signal: c_int) -> Result { + let process_id = pid_t::try_from(process_id).map_err(|_error| { + eyre::eyre!("Recorded child process id is too large for this platform.") + })?; + + if process_id <= 0 { + eyre::bail!("Recorded child process id must be positive."); + } + + match unsafe { libc::kill(process_id, signal) } { + 0 => Ok(true), + -1 if Error::last_os_error().raw_os_error() == Some(ESRCH) => Ok(false), + -1 => Err(Error::last_os_error().into()), + _ => Ok(false), + } +} + +fn wait_for_lane_process_exit(process_id: u32, timeout: Duration) -> bool { + let started_at = Instant::now(); + + while started_at.elapsed() < timeout { + if !orchestrator::process_is_alive(process_id) { + return true; + } + + thread::sleep(Duration::from_millis(100)); + } + + !orchestrator::process_is_alive(process_id) +} + +fn lane_interrupt_next_action( + soft: &LaneSoftInterruptReport, + hard: Option<&LaneHardInterruptReport>, + force: bool, +) -> String { + if let Some(hard) = hard { + return if hard.status == "unavailable" { + String::from("Hard fallback was unavailable; inspect the lane before retrying.") + } else if hard.status == "sent" || hard.status == "process_not_found" { + String::from( + "Inspect the lane to confirm the lease and dirty-worktree reconciliation state.", + ) + } else { + String::from( + "The fallback signal did not stop the recorded process; inspect the host process before retrying.", + ) + }; + } + + match soft.status.as_str() { + "delivered" => + String::from("Inspect the lane until the app-server turn records completion."), + "pending" => + if force { + String::from("Soft interrupt is pending; forced fallback was not attempted.") + } else { + String::from( + "Re-run inspect shortly, or retry interrupt with --force if operator intent is to kill the process.", + ) + }, + "rejected" => String::from( + "Inspect the lane identity before retrying; resolver rejection is not converted into hard fallback.", + ), + "failed" | "unavailable" => String::from( + "Retry with --force only if operator intent is to use hard process-kill fallback.", + ), + _ => String::from("Inspect the lane for the latest run status."), + } +} + +fn render_lane_inspect_report(report: &LaneInspectReport) -> String { + let mut output = format!( + "Lane inspect for {} in project {} ({} run{})\n", + report.issue, + report.project_id, + report.matched_run_count, + if report.matched_run_count == 1 { "" } else { "s" } + ); + + for run in &report.runs { + output.push_str(&format!( + "- {} attempt {}: status={}, phase={}, activeLease={}, liveness={}\n", + run.run_id, + run.attempt_number, + run.status, + run.phase, + run.active_lease, + run.execution_liveness + )); + output.push_str(&format!( + " appServer: thread={}, turn={}, softInterruptAvailable={}\n", + run.thread_id.as_deref().unwrap_or("none"), + run.turn_id.as_deref().unwrap_or("none"), + run.soft_interrupt_available + )); + output.push_str(&format!( + " process: pid={}, alive={}, hardInterruptAvailable={} (requires --force)\n", + run.process_id.map_or_else(|| String::from("none"), |id| id.to_string()), + run.process_alive.map_or_else(|| String::from("unknown"), |alive| alive.to_string()), + run.hard_interrupt_available + )); + } + + output +} + +fn render_lane_interrupt_report(report: &LaneInterruptReport) -> String { + let mut output = format!( + "Lane interrupt {} for run {}: {}\n", + report.classification, report.run_id, report.soft_interrupt.message + ); + + if let Some(hard) = &report.hard_interrupt { + output.push_str(&format!( + "Hard fallback {}: {} ({})\n", + hard.status, + hard.message, + if hard.signals.is_empty() { + String::from("no signals") + } else { + hard.signals.join(",") + } + )); + } + + output.push_str(&format!("Next action: {}\n", report.next_action)); + + output +} diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 267eb2e..e3c0d01 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -27,6 +27,8 @@ enum OperatorRequestRoute { Live, AppSnapshot, LinearScan, + LaneInspect, + LaneInterrupt, AccountList { force_refresh: bool }, AccountSelect, AccountClear, @@ -132,6 +134,17 @@ struct OperatorLinearScanHttpRequest { project_id: Option, } +#[derive(Deserialize)] +struct OperatorLaneInterruptHttpRequest { + #[serde(alias = "projectId")] + project_id: Option, + issue: String, + #[serde(alias = "runId")] + run_id: String, + force: Option, + reason: Option, +} + struct DashboardControlAck<'a> { request_id: Option<&'a str>, action: &'a str, @@ -228,6 +241,20 @@ fn handle_operator_state_endpoint_connection( return Ok(()); } + if route == OperatorRequestRoute::LaneInspect { + let response = build_operator_lane_inspect_http_response(state_store, &request); + + stream.write_all(&response)?; + + return Ok(()); + } + if route == OperatorRequestRoute::LaneInterrupt { + let response = build_operator_lane_interrupt_http_response(state_store, &request); + + stream.write_all(&response)?; + + return Ok(()); + } if route == OperatorRequestRoute::AppSnapshot { let response = build_operator_app_snapshot_http_response(snapshot); @@ -1417,6 +1444,179 @@ fn operator_linear_scan_request_project_id(request: &[u8]) -> Result Vec { + match operator_lane_inspect_http_response_body(state_store, request) { + Ok(body) => http_response_bytes("200 OK", "application/json", &body), + Err(error) => operator_lane_error_http_response(error), + } +} + +fn operator_lane_inspect_http_response_body( + state_store: &StateStore, + request: &[u8], +) -> Result> { + let project_id = operator_http_query_value_alias(request, "projectId", "project_id")?; + let issue = operator_http_query_value(request, "issue")? + .filter(|issue| !issue.trim().is_empty()) + .ok_or_else(|| eyre::eyre!("Lane inspect request requires issue query parameter."))?; + let run_id = operator_http_query_value_alias(request, "runId", "run_id")?; + let project = operator_lane_http_project(state_store, project_id.as_deref())?; + let report = lane_control::build_lane_inspect_report( + state_store, + &project, + issue.trim(), + run_id.as_deref(), + )?; + + serde_json::to_vec(&report).map_err(Into::into) +} + +fn build_operator_lane_interrupt_http_response( + state_store: &StateStore, + request: &[u8], +) -> Vec { + match operator_lane_interrupt_http_response_body(state_store, request) { + Ok((status_line, body)) => http_response_bytes(status_line, "application/json", &body), + Err(error) => operator_lane_error_http_response(error), + } +} + +fn operator_lane_interrupt_http_response_body( + state_store: &StateStore, + request: &[u8], +) -> Result<(&'static str, Vec)> { + let body = operator_http_request_body(request)?; + let request: OperatorLaneInterruptHttpRequest = serde_json::from_slice(body) + .map_err(|error| eyre::eyre!("Lane interrupt request body was not valid JSON: {error}"))?; + + if request.issue.trim().is_empty() { + eyre::bail!("Lane interrupt request issue must not be blank."); + } + if request.run_id.trim().is_empty() { + eyre::bail!("Lane interrupt request runId must not be blank."); + } + + let project = operator_lane_http_project(state_store, request.project_id.as_deref())?; + let report = lane_control::interrupt_lane_with_state( + state_store, + &project, + request.issue.trim(), + request.run_id.trim(), + request.force.unwrap_or(false), + request.reason.as_deref(), + "http", + )?; + let status_line = report.http_status_line(); + + Ok((status_line, serde_json::to_vec(&report)?)) +} + +fn operator_lane_http_project( + state_store: &StateStore, + project_id: Option<&str>, +) -> Result { + let registrations = state_store.list_projects()?; + let registration = match project_id.map(str::trim).filter(|id| !id.is_empty()) { + Some(project_id) => registrations + .iter() + .find(|registration| registration.service_id() == project_id) + .ok_or_else(|| eyre::eyre!("Decodex project `{project_id}` is not registered."))?, + None => { + let enabled = registrations + .iter() + .filter(|registration| registration.enabled()) + .collect::>(); + + if enabled.len() == 1 { + enabled[0] + } else { + eyre::bail!( + "Lane API request requires projectId when zero or multiple projects are enabled." + ); + } + }, + }; + + ServiceConfig::from_path(registration.config_path()) +} + +fn operator_lane_error_http_response(error: Report) -> Vec { + let body = serde_json::to_vec(&json!({ "error": error.to_string() })) + .unwrap_or_else(|_| br#"{"error":"lane request failed"}"#.to_vec()); + + http_response_bytes("400 Bad Request", "application/json", &body) +} + +fn operator_http_query_value(request: &[u8], key: &str) -> Result> { + let request = String::from_utf8_lossy(request); + let Some(request_line) = request.lines().next() else { + return Ok(None); + }; + let Some(path) = request_line.split_whitespace().nth(1) else { + return Ok(None); + }; + let Some(query) = path.split_once('?').map(|(_path, query)| query) else { + return Ok(None); + }; + + for part in query.split('&') { + let (name, value) = part.split_once('=').unwrap_or((part, "")); + + if name == key { + return Ok(Some(percent_decode_operator_query_value(value)?)); + } + } + + Ok(None) +} + +fn operator_http_query_value_alias( + request: &[u8], + primary: &str, + secondary: &str, +) -> Result> { + match operator_http_query_value(request, primary)? { + Some(value) => Ok(Some(value)), + None => operator_http_query_value(request, secondary), + } +} + +fn percent_decode_operator_query_value(value: &str) -> Result { + let raw = value.as_bytes(); + let mut bytes = Vec::with_capacity(value.len()); + let mut index = 0; + + while index < raw.len() { + match raw[index] { + b'+' => { + bytes.push(b' '); + + index += 1; + }, + b'%' if index + 2 < raw.len() => { + let hex = std::str::from_utf8(&raw[index + 1..index + 3])?; + let byte = u8::from_str_radix(hex, 16) + .map_err(|error| eyre::eyre!("Invalid percent-encoded query value: {error}"))?; + + bytes.push(byte); + + index += 3; + }, + byte => { + bytes.push(byte); + + index += 1; + }, + } + } + + String::from_utf8(bytes) + .map_err(|error| eyre::eyre!("Query parameter was not valid UTF-8: {error}")) +} + fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> Vec { match route { OperatorRequestRoute::Dashboard => { @@ -1438,6 +1638,9 @@ fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> OperatorRequestRoute::LinearScan => { http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") }, + OperatorRequestRoute::LaneInspect | OperatorRequestRoute::LaneInterrupt => { + http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") + }, OperatorRequestRoute::Live => { http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ok") }, @@ -1500,6 +1703,8 @@ fn parse_operator_state_request_route( force_refresh: operator_query_has_flag(query, "refresh"), }), ("POST", OPERATOR_LINEAR_SCAN_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LinearScan), + ("GET", OPERATOR_LANE_INSPECT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInspect), + ("POST", OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInterrupt), ("POST", "/api/accounts/select") => Ok(OperatorRequestRoute::AccountSelect), ("POST", "/api/accounts/clear") => Ok(OperatorRequestRoute::AccountClear), ("POST", "/api/accounts/logout") => Ok(OperatorRequestRoute::AccountLogout), @@ -1512,6 +1717,8 @@ fn parse_operator_state_request_route( | OPERATOR_LIVE_ENDPOINT_PATH | OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH | OPERATOR_LINEAR_SCAN_ENDPOINT_PATH + | OPERATOR_LANE_INSPECT_ENDPOINT_PATH + | OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH | OPERATOR_ACCOUNTS_ENDPOINT_PATH | "/api/accounts/select" | "/api/accounts/clear" diff --git a/apps/decodex/src/orchestrator/tests/operator/status/http.rs b/apps/decodex/src/orchestrator/tests/operator/status/http.rs index 919826f..ae0622f 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/http.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/http.rs @@ -2,6 +2,8 @@ use std::io::ErrorKind; use std::net::SocketAddr; use orchestrator::OperatorControlRequests; +use state::RUN_CONTROL_CHANNEL_DIR; +use state::RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE; use crate::runtime; @@ -1389,6 +1391,255 @@ fn operator_state_endpoint_queues_linear_scan_request() { ); } +#[test] +fn operator_lane_inspect_api_returns_lane_identity() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + + fs::create_dir_all(&worktree_path).expect("worktree should exist"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + let response = String::from_utf8(orchestrator::build_operator_lane_inspect_http_response( + &state_store, + format!( + "GET {}?projectId=pubfi&issue=PUB-101&runId=pub-101-attempt-1 HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", + orchestrator::OPERATOR_LANE_INSPECT_ENDPOINT_PATH + ) + .as_bytes(), + )) + .expect("lane inspect response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane inspect response should include body"); + let data: Value = serde_json::from_str(body).expect("lane inspect response should be json"); + + assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); + assert_eq!(data["projectId"], "pubfi"); + assert_eq!(data["issue"], "PUB-101"); + assert_eq!(data["matchedRunCount"], 1); + assert_eq!(data["runs"][0]["runId"], "pub-101-attempt-1"); + assert_eq!(data["runs"][0]["threadId"], "thread-1"); + assert_eq!(data["runs"][0]["turnId"], "turn-1"); +} + +#[test] +fn operator_lane_interrupt_api_rejects_blank_run_id() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let body = br#"{"projectId":"pubfi","issue":"PUB-101","runId":""}"#; + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH, + body.len(), + String::from_utf8_lossy(body) + ); + + state_store.upsert_project(®istration).expect("project should register"); + + let response = String::from_utf8(orchestrator::build_operator_lane_interrupt_http_response( + &state_store, + request.as_bytes(), + )) + .expect("lane interrupt response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane interrupt response should include body"); + let data: Value = serde_json::from_str(body).expect("lane interrupt response should be json"); + + assert!(response.starts_with("HTTP/1.1 400 Bad Request\r\n")); + assert!(data["error"].as_str().unwrap_or_default().contains("runId")); +} + +#[test] +fn operator_lane_interrupt_api_force_reports_hard_fallback_after_pending_soft_interrupt() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + let body = br#"{"projectId":"pubfi","issue":"PUB-101","runId":"pub-101-attempt-1","force":true}"#; + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH, + body.len(), + String::from_utf8_lossy(body) + ); + + fs::create_dir_all(&worktree_path).expect("worktree should exist"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + let channel_path = + worktree_path.join(RUN_CONTROL_CHANNEL_DIR).join("pub-101-attempt-1.channel"); + + fs::create_dir_all(channel_path.parent().expect("channel path should have parent")) + .expect("run-control channel dir should exist"); + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + state_store + .publish_run_control_channel_for_active_attempt( + "pub-101-attempt-1", + 1, + &channel_path, + RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, + ) + .expect("control channel should publish"); + + let response = String::from_utf8(orchestrator::build_operator_lane_interrupt_http_response( + &state_store, + request.as_bytes(), + )) + .expect("lane interrupt response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane interrupt response should include body"); + let data: Value = serde_json::from_str(body).expect("lane interrupt response should be json"); + + assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); + assert_eq!(data["classification"], "hard_interrupt_fallback"); + assert_eq!(data["softInterrupt"]["status"], "pending"); + assert_eq!(data["hardInterrupt"]["status"], "unavailable"); + assert!(data["nextAction"].as_str().unwrap_or_default().contains("Hard fallback was unavailable")); +} + +#[test] +fn operator_lane_interrupt_api_force_does_not_hard_fallback_after_control_rejection() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + let body = + br#"{"projectId":"pubfi","issue":"PUB-101","runId":"pub-101-attempt-1","force":true}"#; + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH, + body.len(), + String::from_utf8_lossy(body) + ); + + fs::create_dir_all(&worktree_path).expect("worktree should exist"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + let response = String::from_utf8(orchestrator::build_operator_lane_interrupt_http_response( + &state_store, + request.as_bytes(), + )) + .expect("lane interrupt response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane interrupt response should include body"); + let data: Value = serde_json::from_str(body).expect("lane interrupt response should be json"); + let events = state_store + .list_private_execution_events(config.service_id(), &issue.id, "pub-101-attempt-1", 1) + .expect("private control audit should read"); + + assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); + assert_eq!(data["force"], true); + assert_eq!(data["classification"], "control_request_rejected"); + assert_eq!(data["softInterrupt"]["status"], "rejected"); + assert_eq!(data["softInterrupt"]["errorClass"], "control_channel_missing"); + assert_eq!(data["hardInterrupt"], Value::Null); + assert!(events.iter().any(|event| { + event.event_type() == "control_action" + && event.payload()["reason"] == "control_channel_missing" + })); +} + #[test] fn operator_state_endpoint_serves_account_api_snapshot() { let temp_dir = TempDir::new().expect("temp dir should exist"); diff --git a/apps/decodex/src/run_control.rs b/apps/decodex/src/run_control.rs new file mode 100644 index 0000000..ee50b7b --- /dev/null +++ b/apps/decodex/src/run_control.rs @@ -0,0 +1,351 @@ +use std::{ + fs, + io::ErrorKind, + path::{Path, PathBuf}, + process, thread, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use time::OffsetDateTime; + +use crate::prelude::{self, eyre}; + +const RUN_CONTROL_DIR: &str = ".decodex-run-control"; +const REQUEST_SUFFIX: &str = ".request.json"; +const RESPONSE_SUFFIX: &str = ".response.json"; +const SCHEMA_INTERRUPT_REQUEST: &str = "decodex/run-control/interrupt-request/1"; +const SCHEMA_INTERRUPT_RESPONSE: &str = "decodex/run-control/interrupt-response/1"; +const POLL_INTERVAL: Duration = Duration::from_millis(100); + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneControlInterruptRequest { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) turn_id: String, + pub(crate) source: String, + pub(crate) reason: Option, + pub(crate) created_at_unix_epoch: i64, +} +impl LaneControlInterruptRequest { + pub(crate) fn new(input: LaneControlInterruptRequestInput<'_>) -> Self { + Self { + schema: String::from(SCHEMA_INTERRUPT_REQUEST), + request_id: fresh_request_id(input.run_id), + project_id: input.project_id.to_owned(), + issue_id: input.issue_id.to_owned(), + run_id: input.run_id.to_owned(), + attempt_number: input.attempt_number, + thread_id: input.thread_id.to_owned(), + turn_id: input.turn_id.to_owned(), + source: input.source.to_owned(), + reason: input.reason.map(str::to_owned), + created_at_unix_epoch: OffsetDateTime::now_utc().unix_timestamp(), + } + } +} + +pub(crate) struct LaneControlInterruptRequestInput<'a> { + pub(crate) project_id: &'a str, + pub(crate) issue_id: &'a str, + pub(crate) run_id: &'a str, + pub(crate) attempt_number: i64, + pub(crate) thread_id: &'a str, + pub(crate) turn_id: &'a str, + pub(crate) source: &'a str, + pub(crate) reason: Option<&'a str>, +} + +#[derive(Clone, Debug)] +pub(crate) struct PendingLaneControlRequest { + pub(crate) path: PathBuf, + pub(crate) request: LaneControlInterruptRequest, +} + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum LaneControlResponseStatus { + SoftDelivered, + SoftFailed, + Rejected, +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneControlInterruptResponse { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) turn_id: String, + pub(crate) status: LaneControlResponseStatus, + pub(crate) classification: String, + pub(crate) method: String, + pub(crate) message: String, + pub(crate) error_class: Option, + pub(crate) protocol_summary: Option, + pub(crate) recorded_at_unix_epoch: i64, +} +impl LaneControlInterruptResponse { + pub(crate) fn delivered( + request: &LaneControlInterruptRequest, + protocol_summary: String, + ) -> Self { + Self::from_request( + request, + LaneControlResponseStatus::SoftDelivered, + "graceful_stop_requested", + "turn/interrupt accepted by app-server.", + None, + Some(protocol_summary), + ) + } + + pub(crate) fn failed( + request: &LaneControlInterruptRequest, + error_class: &str, + message: String, + ) -> Self { + Self::from_request( + request, + LaneControlResponseStatus::SoftFailed, + "soft_interrupt_failed", + message, + Some(error_class.to_owned()), + None, + ) + } + + pub(crate) fn rejected( + request: &LaneControlInterruptRequest, + error_class: &str, + message: String, + ) -> Self { + Self::from_request( + request, + LaneControlResponseStatus::Rejected, + "control_request_rejected", + message, + Some(error_class.to_owned()), + None, + ) + } + + fn from_request( + request: &LaneControlInterruptRequest, + status: LaneControlResponseStatus, + classification: &str, + message: impl Into, + error_class: Option, + protocol_summary: Option, + ) -> Self { + Self { + schema: String::from(SCHEMA_INTERRUPT_RESPONSE), + request_id: request.request_id.clone(), + project_id: request.project_id.clone(), + issue_id: request.issue_id.clone(), + run_id: request.run_id.clone(), + attempt_number: request.attempt_number, + thread_id: request.thread_id.clone(), + turn_id: request.turn_id.clone(), + status, + classification: classification.to_owned(), + method: String::from("turn/interrupt"), + message: message.into(), + error_class, + protocol_summary, + recorded_at_unix_epoch: OffsetDateTime::now_utc().unix_timestamp(), + } + } +} + +pub(crate) fn write_interrupt_request( + worktree_path: &Path, + request: &LaneControlInterruptRequest, +) -> prelude::Result { + let path = interrupt_request_path(worktree_path, &request.run_id, &request.request_id); + + write_json_file_atomically(&path, request)?; + + Ok(path) +} + +pub(crate) fn write_interrupt_response( + worktree_path: &Path, + response: &LaneControlInterruptResponse, +) -> prelude::Result { + let path = interrupt_response_path(worktree_path, &response.run_id, &response.request_id); + + write_json_file_atomically(&path, response)?; + + Ok(path) +} + +pub(crate) fn remove_interrupt_request(path: &Path) -> prelude::Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(()), + Err(error) => Err(error.into()), + } +} + +pub(crate) fn pending_interrupt_requests( + worktree_path: &Path, + run_id: &str, +) -> prelude::Result> { + let dir = run_control_run_dir(worktree_path, run_id); + let Ok(entries) = fs::read_dir(&dir) else { + return Ok(Vec::new()); + }; + let mut requests = entries + .filter_map(std::result::Result::ok) + .map(|entry| entry.path()) + .filter(|path| file_name_ends_with(path, REQUEST_SUFFIX)) + .map(read_pending_interrupt_request) + .collect::>>()?; + + requests.sort_by(|left, right| { + left.request + .created_at_unix_epoch + .cmp(&right.request.created_at_unix_epoch) + .then_with(|| left.request.request_id.cmp(&right.request.request_id)) + }); + + Ok(requests) +} + +pub(crate) fn wait_for_interrupt_response( + worktree_path: &Path, + run_id: &str, + request_id: &str, + timeout: Duration, +) -> prelude::Result> { + let started_at = Instant::now(); + + loop { + if let Some(response) = read_interrupt_response(worktree_path, run_id, request_id)? { + return Ok(Some(response)); + } + + if started_at.elapsed() >= timeout { + return Ok(None); + } + + thread::sleep(POLL_INTERVAL); + } +} + +pub(crate) fn read_interrupt_response( + worktree_path: &Path, + run_id: &str, + request_id: &str, +) -> prelude::Result> { + let path = interrupt_response_path(worktree_path, run_id, request_id); + + match fs::read_to_string(path) { + Ok(raw) => serde_json::from_str(&raw).map(Some).map_err(Into::into), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +pub(crate) fn protocol_response_summary(value: &Value) -> String { + match value { + Value::Null => String::from("null"), + Value::Bool(_) => String::from("boolean"), + Value::Number(_) => String::from("number"), + Value::String(_) => String::from("string"), + Value::Array(values) => format!("array(len={})", values.len()), + Value::Object(entries) => { + let mut keys = entries.keys().map(String::as_str).collect::>(); + + keys.sort_unstable(); + + format!("object(keys={})", keys.join(",")) + }, + } +} + +fn read_pending_interrupt_request(path: PathBuf) -> prelude::Result { + let raw = fs::read_to_string(&path)?; + let request: LaneControlInterruptRequest = serde_json::from_str(&raw)?; + + if request.schema != SCHEMA_INTERRUPT_REQUEST { + eyre::bail!( + "Unsupported lane-control request schema `{}` in `{}`.", + request.schema, + path.display() + ); + } + + Ok(PendingLaneControlRequest { path, request }) +} + +fn interrupt_request_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { + run_control_run_dir(worktree_path, run_id).join(format!( + "{}{}", + sanitize_path_component(request_id), + REQUEST_SUFFIX + )) +} + +fn interrupt_response_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { + run_control_run_dir(worktree_path, run_id).join(format!( + "{}{}", + sanitize_path_component(request_id), + RESPONSE_SUFFIX + )) +} + +fn run_control_run_dir(worktree_path: &Path, run_id: &str) -> PathBuf { + worktree_path.join(RUN_CONTROL_DIR).join(sanitize_path_component(run_id)) +} + +fn write_json_file_atomically(path: &Path, value: &T) -> prelude::Result<()> +where + T: Serialize, +{ + let parent = path + .parent() + .ok_or_else(|| eyre::eyre!("Lane-control file `{}` has no parent.", path.display()))?; + let temp_path = path.with_extension("tmp"); + let data = serde_json::to_vec_pretty(value)?; + + fs::create_dir_all(parent)?; + fs::write(&temp_path, data)?; + fs::rename(&temp_path, path)?; + + Ok(()) +} + +fn fresh_request_id(run_id: &str) -> String { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos(); + + format!("{}-{}-{now}", sanitize_path_component(run_id), process::id()) +} + +fn file_name_ends_with(path: &Path, suffix: &str) -> bool { + path.file_name().and_then(|name| name.to_str()).is_some_and(|name| name.ends_with(suffix)) +} + +fn sanitize_path_component(value: &str) -> String { + let sanitized = value + .chars() + .map(|character| match character { + 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' => character, + _ => '-', + }) + .collect::(); + + if sanitized.is_empty() { String::from("lane-control") } else { sanitized } +} diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index 21acf51..4066c5b 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -38,8 +38,9 @@ launch it connects to an existing default local listener when one is reachable; not, it starts the bundled `decodex` binary as `decodex serve --listen-address 127.0.0.1:8912`. The app fallback is a normal control-plane server: it loads the enabled project registry, uses the CLI-owned default -cadences, and serves the dashboard, account APIs, `GET /api/operator-snapshot`, and -`POST /api/linear-scan` from the single local listener. +cadences, and serves the dashboard, account APIs, `GET /api/operator-snapshot`, +`POST /api/linear-scan`, `GET /api/lane/inspect`, and `POST /api/lane/interrupt` from +the single local listener. `decodex serve` has two hardcoded scheduler cadences: @@ -61,6 +62,21 @@ An empty `POST /api/linear-scan` queues a scan for all enabled projects. Request consumed by the next 15-second control-plane tick and still respect any active tracker rate-limit backoff. +Lane inspect and interrupt are local control APIs, not dashboard UI actions: + +```sh +curl -sS 'http://127.0.0.1:8912/api/lane/inspect?projectId=decodex&issue=XY-703' +curl -sS -X POST http://127.0.0.1:8912/api/lane/interrupt \ + -H 'Content-Type: application/json' \ + -d '{"projectId":"decodex","issue":"XY-703","runId":""}' +``` + +`POST /api/lane/interrupt` first writes a soft interrupt request for the active +app-server child to deliver with `turn/interrupt`. Add `"force": true` only when the +operator explicitly wants hard process-kill fallback after soft interrupt is +unavailable or does not return in the local wait window. Hard fallback is reported as +`hard_interrupt_fallback`, not as a graceful stop. + Use `--dev` only for isolated local development: - Developers may use `--dev` to exercise real account APIs, `GET /api/operator-snapshot`, @@ -161,7 +177,8 @@ steer, retry, task replacement, or lifecycle mutations. CLI/API is the first operator-control surface for lane control, governed by [`../spec/lane-control.md`](../spec/lane-control.md). The browser UI does not show or accept active-lane stop/interrupt controls, project pause/resume controls, manual retry -controls, or active-lane steer controls. Account-pool selection remains available +controls, or active-lane steer controls; use `decodex lane inspect`, `decodex lane +interrupt`, or the local `/api/lane/*` endpoints instead. Account-pool selection remains available because it changes the global Codex account selector, not an active lane. `runActivity.activeRunsComplete` marks whether a payload is the complete active-run list; subscription-filtered diff --git a/docs/spec/app-server.md b/docs/spec/app-server.md index 2e78dcc..3e474d6 100644 --- a/docs/spec/app-server.md +++ b/docs/spec/app-server.md @@ -152,6 +152,8 @@ If generated schema or live capability probing shows that `turn/interrupt` or `turn/steer` is unavailable, the CLI/API control must report that control as unsupported for the active lane instead of failing ordinary issue dispatch. The lane-control contract and support matrix live in [`lane-control.md`](./lane-control.md). +Decodex currently implements `turn/interrupt` through the child-owned app-server +connection for active turns; `turn/steer` remains planned. ## Required request flow @@ -317,6 +319,25 @@ Decodex's intended use is soft active-turn interruption from a CLI/API operator control. It should target the current known thread and turn, request a graceful turn stop through app-server, and leave outcome classification to the Decodex runtime. +Request parameters: + +```json +{ + "threadId": "", + "turnId": "" +} +``` + +Decodex treats the app-server response as protocol evidence rather than a private +payload to expose. The local control response records a summary such as object keys or +array length, plus a normalized result: + +- `soft_delivered` when app-server accepts the JSON-RPC request +- `soft_failed` when the method is unsupported, times out, or returns another protocol + error +- `rejected` when the child process finds that the requested project, issue, run, + attempt, thread, or turn no longer matches the active turn + `turn/interrupt` must not: - mutate tracker state directly @@ -324,9 +345,9 @@ stop through app-server, and leave outcome classification to the Decodex runtime - clear leases without runtime classification - replace the hard-interrupt process fallback when app-server is unreachable -When implemented, Decodex should prefer `turn/interrupt` before signaling the child -process. A hard interrupt remains only a fallback after soft interrupt is unavailable, -times out, or cannot be routed to the live app-server session. +Decodex prefers `turn/interrupt` before signaling the child process. A hard interrupt +remains only a fallback after explicit operator intent and after soft interrupt is +unavailable, times out, or cannot be routed to the live app-server session. ## `turn/steer` diff --git a/docs/spec/lane-control.md b/docs/spec/lane-control.md index 495ced5..1e6bcc7 100644 --- a/docs/spec/lane-control.md +++ b/docs/spec/lane-control.md @@ -33,13 +33,13 @@ agent-facing skills must guide responsible use. | Capability | Contract status | Current implementation evidence | Required behavior | | --- | --- | --- | --- | -| Inspect lane state | Supported | `decodex status`, `decodex status --json`, `decodex diagnose --json`, `decodex evidence `, operator snapshots, and dashboard views | Always inspect before mutating or steering. Inspection must not mutate tracker state, runtime DB rows, worktrees, or app-server turns. | +| Inspect lane state | Supported | `decodex lane inspect `, `decodex status`, `decodex status --json`, `decodex diagnose --json`, `decodex evidence `, `GET /api/lane/inspect`, operator snapshots, and dashboard views | Always inspect before mutating or steering. Inspection must not mutate tracker state, runtime DB rows, worktrees, or app-server turns. | | Project dispatch pause | Supported for future dispatch | `decodex project disable ` and the runtime project enabled flag | Pause prevents new dispatch for the project. It must not kill or rewrite already active lanes. | | Project dispatch resume | Supported for future dispatch | `decodex project enable ` and the runtime project enabled flag | Resume re-enables future dispatch after the operator has inspected blockers, capacity, and queue state. | | Linear scan request | Supported | `POST /api/linear-scan` with optional `projectId` | Queue a scan for the next control-plane tick while respecting tracker backoff. This is an intake/status refresh request, not an execution command. | -| Run-control channel foundation | Supported foundation | Active attempts publish a local `.decodex-run-control/*` channel record, runtime SQLite `run_control_channels`, operator status `control_capability`, and private `control_action` audit events | Route any future mutation through the active attempt's project, issue, run id, attempt, thread id, current turn id, active lease, and local channel metadata. Invalid or stale requests fail closed and remain local audit evidence. | -| Soft interrupt | Planned CLI/API control; bottom-layer method allowed | Decodex does not currently send `turn/interrupt` from its app-server client | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and must leave classification to the runtime. | -| Hard interrupt fallback | Emergency fallback only | No dashboard or CLI/API lane-control path exposes hard interrupt in this rollout; runtime recovery can still classify attempts as `interrupted` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | +| Run-control channel foundation | Supported foundation | Active attempts publish a local `.decodex-run-control/*` channel record, runtime SQLite `run_control_channels`, operator status `control_capability`, and private `control_action` audit events | Route lane-control mutations through the active attempt's project, issue, run id, attempt, thread id, current turn id, active lease, and local channel metadata. Invalid or stale requests fail closed and remain local audit evidence. | +| Soft interrupt | Supported CLI/API control | `decodex lane interrupt --run-id ` and `POST /api/lane/interrupt` write a run-control request that the active app-server child delivers with `turn/interrupt` | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and records the protocol outcome when app-server returns one. | +| Hard interrupt fallback | Explicit fallback only | `decodex lane interrupt --run-id --force` and `POST /api/lane/interrupt` with `"force": true` classify process signaling as `hard_interrupt_fallback` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | | Steer active lane | Planned CLI/API control; bottom-layer method must stay broad | Decodex does not currently send `turn/steer` from its app-server client | Pass operator-supplied steer text through the CLI/API when available. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | | Retained resume/retry | Supported through runtime lifecycle | `decodex run `, retry scheduling, retained worktree recovery, and `thread/resume` for same-thread app-server continuation | Resume only when retained worktree, issue, branch, PR, and runtime evidence still prove the same lane. Treat ambiguous lineage as manual attention. | | Manual attention | Supported terminal control path | `decodex:needs-attention`, `issue_comment(kind = "manual_attention")`, and `issue_terminal_finalize(path = "manual_attention")` | Stop automation when policy requires a human decision. Explain the blocker through structured public fields and keep private evidence local. | @@ -102,11 +102,30 @@ Soft interrupt is the preferred active-turn stop path. A compliant soft interrup - leaves tracker state, retry policy, and retained-worktree classification to the Decodex runtime +The supported operator commands are: + +- `decodex lane inspect [--run-id ] [--json]` +- `decodex lane interrupt --run-id [--json] [--reason ]` +- `decodex lane interrupt --run-id --force [--json] [--reason ]` + +The local HTTP API mirrors those semantics: + +- `GET /api/lane/inspect?projectId=&issue=[&runId=]` +- `POST /api/lane/interrupt` with JSON fields `projectId`, `issue`, `runId`, + optional `reason`, and optional `force` + +When only one project is enabled on the local listener, the HTTP API can infer +`projectId`; otherwise the request must include it. CLI/API responses include the +lane identity, app-server thread and turn ids when known, process liveness summary, +soft-interrupt classification, hard-fallback classification when used, and next action. +They do not include private payload bodies. + Hard interrupt is a fallback, not the normal operator control. A hard interrupt may -signal the recorded child process only after Decodex proves the process identity still -matches the current run attempt. The runtime must preserve evidence, mark the attempt -with an interruption status, clear or retain ownership according to the runtime -contract, and avoid pretending the agent completed a terminal path. +signal the recorded child process only after explicit `--force` or `"force": true` +operator intent. The fallback emits `hard_interrupt_fallback`, preserves local +evidence, marks an active attempt as `interrupted` when a recorded child is signaled, +clears or retains ownership according to the runtime contract, and avoids pretending +the agent completed a terminal path. ## Steer @@ -185,11 +204,11 @@ Linear unless a schema-controlled public projection explicitly allows it. ## Implementation Status For This Rollout This document specifies capabilities that the CLI/API should expose first. Current code -already supports inspect, CLI project enable/disable, Linear scan requests, retained -resume/retry lifecycle paths, and manual-attention finalization. Current code does not -expose dashboard lane-mutation controls, does not yet implement Decodex CLI/API -controls that send `turn/interrupt` or `turn/steer`, and does not expose raw -`thread/inject_items` as an operator feature. +supports lane inspect, CLI project enable/disable, Linear scan requests, soft +interrupt, explicit hard-interrupt fallback, retained resume/retry lifecycle paths, and +manual-attention finalization. Current code does not expose dashboard lane-mutation +controls, does not yet implement Decodex CLI/API controls that send `turn/steer`, and +does not expose raw `thread/inject_items` as an operator feature. When implementation work adds the missing CLI/API controls, update this spec, [`app-server.md`](./app-server.md), the operator reference, and the Decodex plugin diff --git a/plugins/decodex/skills/automation/SKILL.md b/plugins/decodex/skills/automation/SKILL.md index 461daac..cd8b4fa 100644 --- a/plugins/decodex/skills/automation/SKILL.md +++ b/plugins/decodex/skills/automation/SKILL.md @@ -114,18 +114,21 @@ Read `docs/spec/lane-control.md` before using or explaining operator controls. Rules for agents: -- Inspect first with `decodex status`, `decodex status --json`, `decodex diagnose - --json`, `decodex evidence `, or the dashboard snapshot. Confirm project id, - issue id, branch, run id, attempt, thread/turn evidence, process liveness, tracker - state, and PR lineage before mutating anything. +- Inspect first with `decodex lane inspect `, `decodex status`, + `decodex status --json`, `decodex diagnose --json`, `decodex evidence `, or + the dashboard snapshot. Confirm project id, issue id, branch, run id, attempt, + thread/turn evidence, process liveness, tracker state, and PR lineage before mutating + anything. - Use project dispatch pause/resume only for future intake. `decodex project disable ` pauses new dispatch; `decodex project enable ` resumes it. Neither command kills active lanes. - Request Linear refresh with `POST /api/linear-scan` when a newly queued or relabeled issue should be observed before the next 5-minute poll. -- Prefer soft interrupt through the CLI/API `turn/interrupt` control when it exists and - the active turn can be targeted. Use hard process interruption only as a fallback when - soft interrupt is unavailable, timed out, or impossible. +- Prefer `decodex lane interrupt --run-id ` or + `POST /api/lane/interrupt` for soft `turn/interrupt` when the active turn can be + targeted. Use hard process interruption only with `--force` or `"force": true` and + only as `hard_interrupt_fallback` after soft interrupt is unavailable, timed out, or + impossible. - Use steer only through the CLI/API lane-control surface and only when the operator supplies the steer text. Bottom-layer steer support is broad; policy, audit, privacy, workflow, recovery, and skills provide the guardrails. diff --git a/plugins/decodex/skills/manual-cli/SKILL.md b/plugins/decodex/skills/manual-cli/SKILL.md index 20c1add..dbd4675 100644 --- a/plugins/decodex/skills/manual-cli/SKILL.md +++ b/plugins/decodex/skills/manual-cli/SKILL.md @@ -62,17 +62,20 @@ Before starting a live run, read the `automation` skill and the registered proje CLI/API lane controls: -- Inspect first with `decodex status`, `decodex status --json`, `decodex diagnose - --json`, or `decodex evidence `. +- Inspect first with `decodex lane inspect `, `decodex status`, + `decodex status --json`, `decodex diagnose --json`, or + `decodex evidence `. - Use `decodex project disable ` to pause future dispatch for a registered project, and `decodex project enable ` to resume it. - Use `POST /api/linear-scan` to request an intake/status refresh before the next scheduled Linear poll. - Use `decodex run ` only for deliberate one-issue automation or retained retry/resume when the lane remains eligible under the registered workflow. -- When future CLI/API controls expose soft interrupt or steer, prefer soft interrupt - over hard process interruption and use steer only with explicit operator-supplied - text. +- Use `decodex lane interrupt --run-id ` for soft active-turn + interruption. Add `--force` only when explicit operator intent allows hard + process-kill fallback after soft interrupt is unavailable or fails. +- Steer remains future CLI/API work; use it only through the lane-control surface once + implemented and only with explicit operator-supplied text. - Do not use active-lane UI controls, direct runtime DB edits, raw `thread/inject_items`, or tracker-state mutations as substitutes for the lane-control contract.