From b98fa3440cbcfe0db844f9c4cddfa151b4aae8e1 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 2 Jun 2026 22:11:22 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"implement lane inspect and interrupt controls","authority":"XY-703"} --- .gitignore | 2 + apps/decodex/src/agent/app_server.rs | 472 +++++++++--- apps/decodex/src/agent/app_server/protocol.rs | 22 + apps/decodex/src/agent/app_server/tests.rs | 3 + apps/decodex/src/cli.rs | 143 +++- apps/decodex/src/lib.rs | 1 + apps/decodex/src/orchestrator.rs | 8 + apps/decodex/src/orchestrator/execution.rs | 1 + apps/decodex/src/orchestrator/lane_control.rs | 727 ++++++++++++++++++ .../decodex/src/orchestrator/operator_http.rs | 207 +++++ .../tests/operator/status/http.rs | 161 ++++ apps/decodex/src/run_control.rs | 351 +++++++++ docs/reference/operator-control-plane.md | 23 +- docs/spec/app-server.md | 27 +- docs/spec/lane-control.md | 43 +- plugins/decodex/skills/automation/SKILL.md | 17 +- plugins/decodex/skills/manual-cli/SKILL.md | 13 +- 17 files changed, 2096 insertions(+), 125 deletions(-) create mode 100644 apps/decodex/src/orchestrator/lane_control.rs create mode 100644 apps/decodex/src/run_control.rs diff --git a/.gitignore b/.gitignore index c692b22..26fad43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # AI .codex .decodex +.decodex-run-activity +.decodex-run-control !apps/decodex-app/.codex/ !apps/decodex-app/.codex/environments/ !apps/decodex-app/.codex/environments/environment.toml diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index 0be81a8..95399a8 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -8,6 +8,7 @@ use std::{ env, error::Error, fmt::{self, Display, Formatter}, + mem, path::{Path, PathBuf}, time::{Duration, Instant}, }; @@ -30,7 +31,7 @@ use self::protocol::{ ProbeDynamicToolHandler, RunOutcome, RuntimeConfigSummary, SkillsListParams, SkillsListResponse, ThreadArchiveRequest, ThreadResumeRequest, ThreadSessionResponse, ThreadStartRequest, ThreadStatusChangedNotification, ToolRequestUserInputResponse, - TurnCompletedNotification, TurnError, TurnStartRequest, UserInput, + TurnCompletedNotification, TurnError, TurnInterruptRequest, TurnStartRequest, UserInput, }; use crate::{ agent::{ @@ -47,6 +48,9 @@ use crate::{ }, }, prelude::eyre, + run_control::{ + self, LaneControlInterruptRequest, LaneControlInterruptResponse, PendingLaneControlRequest, + }, state::{ self, CodexAccountActivitySummary, CodexAccountMarker, EffectiveRuntimeMarker, RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, StateStore, @@ -58,6 +62,7 @@ pub(crate) const MODEL_EXECUTION_IDLE_TIMEOUT: Duration = Duration::from_secs(30 const PROBE_TIMEOUT: Duration = Duration::from_secs(30); const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +const RUN_CONTROL_POLL_INTERVAL: Duration = Duration::from_millis(500); const MCP_PREFLIGHT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); const PLUGIN_PREFLIGHT_MAX_ATTEMPTS: u32 = 2; const PROBE_RUN_ID: &str = "protocol-probe-run"; @@ -416,6 +421,7 @@ impl Error for AppServerDynamicToolFailure {} #[derive(Clone)] pub(crate) struct AppServerRunRequest<'a> { + pub(crate) project_id: String, pub(crate) run_id: String, pub(crate) issue_id: String, pub(crate) attempt_number: i64, @@ -1066,6 +1072,7 @@ pub(crate) fn probe_app_server(listen: &str) -> crate::prelude::Result, + request: &AppServerRunRequest<'_>, target_thread_id: &str, target_turn_id: &str, - timeout: Duration, - dynamic_tool_handler: Option<&dyn DynamicToolHandler>, - codex_account_provider: Option<&dyn CodexAccountProvider>, ) -> crate::prelude::Result { + let control_enabled = request.activity_marker_path.is_some(); let mut last_activity_at = Instant::now(); let mut final_output = String::new(); let mut latest_turn_failure: Option = None; loop { - let idle_timeout = - protocol_activity_idle_timeout(Some(&recorder.protocol_activity.summary), timeout); - let wire_message = next_turn_wire_message( + if control_enabled { + handle_pending_turn_control_requests( + client, + recorder, + request, + target_thread_id, + target_turn_id, + )?; + } + + let idle_timeout = protocol_activity_idle_timeout( + Some(&recorder.protocol_activity.summary), + request.timeout, + ); + let Some(wire_message) = next_turn_wire_message( client, last_activity_at, idle_timeout, target_thread_id, target_turn_id, latest_turn_failure.as_ref(), - )?; + control_enabled, + )? + else { + continue; + }; if !targets_thread(&wire_message, Some(target_thread_id)) { tracing::debug!(raw = %wire_message.raw, "Ignoring app-server message for another thread."); @@ -3322,84 +3336,25 @@ fn wait_for_turn_completion( apply_protocol_message_side_effects(recorder, &wire_message)?; match &wire_message.message { - JsonRpcMessage::Notification(notification) => match notification.method.as_str() { - "thread/status/changed" => { - let payload: ThreadStatusChangedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.status.kind == "systemError" && latest_turn_failure.is_none() { - latest_turn_failure = - Some(AppServerTurnFailure::from_system_error(&payload.thread_id)); - } - }, - "error" => { - if let Some((failure, will_retry)) = failure_from_error_notification( - notification, - target_thread_id, - target_turn_id, - )? { - if failure.requires_operator_attention() && will_retry != Some(true) { - return Err(Report::new(failure)); - } - - latest_turn_failure = Some(failure); - } - }, - "item/agentMessage/delta" => { - let payload: AgentMessageDeltaNotification = - serde_json::from_value(notification.params.clone())?; - - final_output.push_str(&payload.delta); - }, - "item/completed" => { - let payload: ItemCompletedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.item.kind == "agentMessage" - && let Some(text) = payload.item.text - { - final_output = text; - } - }, - "turn/completed" => { - let payload: TurnCompletedNotification = - serde_json::from_value(notification.params.clone())?; - - if payload.turn.id != target_turn_id { - continue; - } - if payload.turn.status == "completed" { - return Ok(RunOutcome { final_output }); - } - - if let Some(error) = payload.turn.error.as_ref() { - return Err(Report::new(turn_failure_from_turn_error( - target_thread_id, - Some(&payload.turn.id), - &payload.turn.status, - error, - ))); - } - if let Some(failure) = latest_turn_failure { - return Err(Report::new(failure)); - } - - eyre::bail!( - "Turn `{}` ended with status `{}` without an explicit error payload.", - payload.turn.id, - payload.turn.status - ); - }, - _ => {}, + JsonRpcMessage::Notification(notification) => { + if let Some(outcome) = handle_turn_execution_notification( + notification, + target_thread_id, + target_turn_id, + &mut final_output, + &mut latest_turn_failure, + )? { + return Ok(outcome); + } }, - JsonRpcMessage::Request(request) => handle_turn_execution_request( + JsonRpcMessage::Request(server_request) => handle_turn_execution_request( client, recorder, - request, + server_request, target_thread_id, target_turn_id, - dynamic_tool_handler, - codex_account_provider, + request.dynamic_tool_handler, + request.codex_account_provider, )?, JsonRpcMessage::Response(_) => ignore_orphan_turn_json_rpc_response(), JsonRpcMessage::Error(error) => { @@ -3413,6 +3368,85 @@ fn wait_for_turn_completion( } } +fn handle_turn_execution_notification( + notification: &JsonRpcNotification, + target_thread_id: &str, + target_turn_id: &str, + final_output: &mut String, + latest_turn_failure: &mut Option, +) -> crate::prelude::Result> { + match notification.method.as_str() { + "thread/status/changed" => { + let payload: ThreadStatusChangedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.status.kind == "systemError" && latest_turn_failure.is_none() { + *latest_turn_failure = + Some(AppServerTurnFailure::from_system_error(&payload.thread_id)); + } + }, + "error" => { + if let Some((failure, will_retry)) = + failure_from_error_notification(notification, target_thread_id, target_turn_id)? + { + if failure.requires_operator_attention() && will_retry != Some(true) { + return Err(Report::new(failure)); + } + + *latest_turn_failure = Some(failure); + } + }, + "item/agentMessage/delta" => { + let payload: AgentMessageDeltaNotification = + serde_json::from_value(notification.params.clone())?; + + final_output.push_str(&payload.delta); + }, + "item/completed" => { + let payload: ItemCompletedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.item.kind == "agentMessage" + && let Some(text) = payload.item.text + { + *final_output = text; + } + }, + "turn/completed" => { + let payload: TurnCompletedNotification = + serde_json::from_value(notification.params.clone())?; + + if payload.turn.id != target_turn_id { + return Ok(None); + } + if payload.turn.status == "completed" { + return Ok(Some(RunOutcome { final_output: mem::take(final_output) })); + } + + if let Some(error) = payload.turn.error.as_ref() { + return Err(Report::new(turn_failure_from_turn_error( + target_thread_id, + Some(&payload.turn.id), + &payload.turn.status, + error, + ))); + } + if let Some(failure) = latest_turn_failure.take() { + return Err(Report::new(failure)); + } + + eyre::bail!( + "Turn `{}` ended with status `{}` without an explicit error payload.", + payload.turn.id, + payload.turn.status + ); + }, + _ => {}, + } + + Ok(None) +} + fn next_turn_wire_message( client: &mut AppServerClient, last_activity_at: Instant, @@ -3420,13 +3454,265 @@ fn next_turn_wire_message( target_thread_id: &str, target_turn_id: &str, latest_turn_failure: Option<&AppServerTurnFailure>, -) -> crate::prelude::Result { + control_enabled: bool, +) -> crate::prelude::Result> { let now = Instant::now(); let wait_timeout = remaining_idle_budget(last_activity_at, now, timeout).ok_or_else(|| { turn_wait_timeout_error(target_thread_id, target_turn_id, latest_turn_failure.cloned()) })?; + let recv_timeout = + if control_enabled { wait_timeout.min(RUN_CONTROL_POLL_INTERVAL) } else { wait_timeout }; + + match recv_turn_wire_message(client, recv_timeout, latest_turn_failure) { + Ok(wire_message) => Ok(Some(wire_message)), + Err(error) + if control_enabled + && recv_timeout < wait_timeout + && is_app_server_output_timeout(&error) => + Ok(None), + Err(error) => Err(error), + } +} + +fn handle_pending_turn_control_requests( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + request: &AppServerRunRequest<'_>, + target_thread_id: &str, + target_turn_id: &str, +) -> crate::prelude::Result<()> { + let Some(worktree_path) = request.activity_marker_path.as_deref() else { + return Ok(()); + }; + + for pending in run_control::pending_interrupt_requests(worktree_path, &request.run_id)? { + handle_pending_turn_interrupt_request( + client, + recorder, + request, + worktree_path, + pending, + target_thread_id, + target_turn_id, + )?; + } + + Ok(()) +} + +fn handle_pending_turn_interrupt_request( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + run_request: &AppServerRunRequest<'_>, + worktree_path: &Path, + pending: PendingLaneControlRequest, + target_thread_id: &str, + target_turn_id: &str, +) -> crate::prelude::Result<()> { + record_lane_interrupt_request(recorder, &pending.request)?; + + if let Some((error_class, message)) = lane_interrupt_request_rejection( + run_request, + &pending.request, + target_thread_id, + target_turn_id, + ) { + let response = + LaneControlInterruptResponse::rejected(&pending.request, error_class, message); + + record_lane_interrupt_response(recorder, &response)?; + + run_control::write_interrupt_response(worktree_path, &response)?; + run_control::remove_interrupt_request(&pending.path)?; + + return Ok(()); + } + + let interrupt = TurnInterruptRequest { + thread_id: pending.request.thread_id.clone(), + turn_id: pending.request.turn_id.clone(), + }; + let result = client.interrupt_turn_with_handler( + interrupt, + |connection, wire_message, server_request| { + handle_server_request_while_waiting( + connection, + recorder, + wire_message, + server_request, + RequestDispatchContext::new( + RequestWaitPhase::TurnExecution, + run_request.dynamic_tool_handler, + run_request.codex_account_provider, + Some(target_thread_id), + Some(target_turn_id), + ), + ) + }, + ); + let response = match result { + Ok(value) => LaneControlInterruptResponse::delivered( + &pending.request, + run_control::protocol_response_summary(&value), + ), + Err(error) => LaneControlInterruptResponse::failed( + &pending.request, + soft_interrupt_error_class(&error), + format!("turn/interrupt failed with {}.", soft_interrupt_error_class(&error)), + ), + }; + + record_lane_interrupt_response(recorder, &response)?; + + run_control::write_interrupt_response(worktree_path, &response)?; + run_control::remove_interrupt_request(&pending.path)?; + + Ok(()) +} + +fn record_lane_interrupt_request( + recorder: &mut RunRecorder<'_>, + request: &LaneControlInterruptRequest, +) -> crate::prelude::Result<()> { + recorder.record( + "lane_control/interrupt/request", + &serde_json::json!({ + "requestId": request.request_id, + "projectId": request.project_id, + "issueId": request.issue_id, + "runId": request.run_id, + "attemptNumber": request.attempt_number, + "threadId": request.thread_id, + "turnId": request.turn_id, + "source": request.source, + "reason": request.reason, + }) + .to_string(), + ) +} + +fn record_lane_interrupt_response( + recorder: &mut RunRecorder<'_>, + response: &LaneControlInterruptResponse, +) -> crate::prelude::Result<()> { + recorder.record( + "lane_control/interrupt/response", + &serde_json::json!({ + "requestId": response.request_id, + "projectId": response.project_id, + "issueId": response.issue_id, + "runId": response.run_id, + "attemptNumber": response.attempt_number, + "threadId": response.thread_id, + "turnId": response.turn_id, + "status": response.status, + "classification": response.classification, + "method": response.method, + "errorClass": response.error_class, + "protocolSummary": response.protocol_summary, + }) + .to_string(), + )?; + recorder.state_store.append_private_execution_event( + &response.project_id, + &response.issue_id, + &response.run_id, + response.attempt_number, + "lane_control/interrupt", + serde_json::json!({ + "requestId": response.request_id, + "status": response.status, + "classification": response.classification, + "method": response.method, + "errorClass": response.error_class, + "protocolSummary": response.protocol_summary, + "message": response.message, + }), + )?; - recv_turn_wire_message(client, wait_timeout, latest_turn_failure) + Ok(()) +} + +fn lane_interrupt_request_rejection( + run_request: &AppServerRunRequest<'_>, + request: &LaneControlInterruptRequest, + target_thread_id: &str, + target_turn_id: &str, +) -> Option<(&'static str, String)> { + if request.project_id != run_request.project_id { + return Some(( + "project_mismatch", + format!( + "Control request targeted project `{}`, but this run belongs to `{}`.", + request.project_id, run_request.project_id + ), + )); + } + if request.issue_id != run_request.issue_id { + return Some(( + "issue_mismatch", + format!( + "Control request targeted issue `{}`, but this run belongs to `{}`.", + request.issue_id, run_request.issue_id + ), + )); + } + if request.run_id != run_request.run_id { + return Some(( + "run_mismatch", + format!( + "Control request targeted run `{}`, but this run is `{}`.", + request.run_id, run_request.run_id + ), + )); + } + if request.attempt_number != run_request.attempt_number { + return Some(( + "attempt_mismatch", + format!( + "Control request targeted attempt `{}`, but this run is attempt `{}`.", + request.attempt_number, run_request.attempt_number + ), + )); + } + if request.thread_id != target_thread_id { + return Some(( + "thread_mismatch", + format!( + "Control request targeted thread `{}`, but the active thread is `{target_thread_id}`.", + request.thread_id + ), + )); + } + if request.turn_id != target_turn_id { + return Some(( + "turn_mismatch", + format!( + "Control request targeted turn `{}`, but the active turn is `{target_turn_id}`.", + request.turn_id + ), + )); + } + + None +} + +fn soft_interrupt_error_class(error: &Report) -> &'static str { + if is_app_server_output_timeout(error) { + return "soft_interrupt_timed_out"; + } + + let error_text = error.to_string().to_ascii_lowercase(); + + if error_text.contains("-32601") || error_text.contains("method not found") { + "soft_interrupt_unsupported" + } else { + "soft_interrupt_failed" + } +} + +fn is_app_server_output_timeout(error: &Report) -> bool { + error.downcast_ref::().is_some() } fn handle_turn_execution_request( diff --git a/apps/decodex/src/agent/app_server/protocol.rs b/apps/decodex/src/agent/app_server/protocol.rs index 6b0d79a..3ce2bb7 100644 --- a/apps/decodex/src/agent/app_server/protocol.rs +++ b/apps/decodex/src/agent/app_server/protocol.rs @@ -181,6 +181,21 @@ impl AppServerClient { self.connection.request_with_handler("turn/start", ¶ms, REQUEST_TIMEOUT, handler) } + pub(super) fn interrupt_turn_with_handler( + &mut self, + params: TurnInterruptRequest, + handler: H, + ) -> crate::prelude::Result + where + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, + { + self.connection.request_with_handler("turn/interrupt", ¶ms, REQUEST_TIMEOUT, handler) + } + pub(super) fn command_exec( &mut self, params: &CommandExecParams, @@ -426,6 +441,13 @@ pub(super) struct TurnStartResponse { pub(super) turn: TurnStatusPayload, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(super) struct TurnInterruptRequest { + pub(super) thread_id: String, + pub(super) turn_id: String, +} + #[derive(Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub(super) struct CommandExecParams { diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index 138d17c..75a6399 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -427,6 +427,7 @@ fn turn_start_request_omits_execution_policy_overrides() { fn minimal_run_request<'a>() -> super::AppServerRunRequest<'a> { super::AppServerRunRequest { + project_id: String::from("test-project"), run_id: String::from("run-1"), issue_id: String::from("issue-1"), attempt_number: 1, @@ -1866,6 +1867,7 @@ fn live_app_server_resume_round_trip_updates_marker_and_state() { ); let first_result = super::execute_app_server_run( &super::AppServerRunRequest { + project_id: String::from("test-project"), run_id: String::from("live-resume-run"), issue_id: String::from("live-resume-issue"), attempt_number: 1, @@ -1912,6 +1914,7 @@ fn live_app_server_resume_round_trip_updates_marker_and_state() { StateStore::open_in_memory().expect("resumed state store should open"); let second_result = super::execute_app_server_run( &super::AppServerRunRequest { + project_id: String::from("test-project"), run_id: String::from("live-resume-run"), issue_id: String::from("live-resume-issue"), attempt_number: 1, diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index dee43a0..b9a416a 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -20,7 +20,8 @@ use crate::{ maintenance::{self, MaintenanceMode, MaintenancePruneRequest, MaintenanceScope}, manual::{self, ManualCommitRequest, ManualLandRequest}, orchestrator::{ - self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, RunOnceRequest, ServeRequest, + self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, LaneInspectRequest, + LaneInterruptRequest, RunOnceRequest, ServeRequest, }, prelude::{Result, eyre}, radar::{ @@ -62,6 +63,7 @@ impl Cli { Command::Run(args) => args.run(), Command::Serve(args) => args.run(), Command::Project(args) => args.run(), + Command::Lane(args) => args.run(), Command::Status(args) => args.run(), Command::Diagnose(args) => args.run(), Command::Evidence(args) => args.run(), @@ -419,6 +421,68 @@ struct ProjectToggleCommand { service_id: String, } +#[derive(Debug, Args)] +struct LaneCommand { + #[command(flatten)] + project_config: ProjectConfigArgs, + #[command(subcommand)] + command: LaneSubcommand, +} +impl LaneCommand { + fn run(&self) -> Result<()> { + match &self.command { + LaneSubcommand::Inspect(args) => orchestrator::print_lane_inspect(LaneInspectRequest { + config_path: self.project_config.as_path(), + issue: &args.issue, + run_id: args.run_id.as_deref(), + json: args.json, + }), + LaneSubcommand::Interrupt(args) => orchestrator::interrupt_lane(LaneInterruptRequest { + config_path: self.project_config.as_path(), + issue: &args.issue, + run_id: &args.run_id, + force: args.force, + reason: args.reason.as_deref(), + json: args.json, + source: "cli", + }) + .map(|_report| ()), + } + } +} + +#[derive(Debug, Args)] +struct LaneInspectCommand { + /// Issue identifier or local issue id to inspect. + #[arg(value_name = "ISSUE")] + issue: String, + /// Restrict inspection to one run id. + #[arg(long, value_name = "RUN_ID")] + run_id: Option, + /// Emit structured JSON instead of human-readable text. + #[arg(long)] + json: bool, +} + +#[derive(Debug, Args)] +struct LaneInterruptCommand { + /// Issue identifier or local issue id to interrupt. + #[arg(value_name = "ISSUE")] + issue: String, + /// Run id for the active app-server turn to interrupt. + #[arg(long, value_name = "RUN_ID")] + run_id: String, + /// Use hard process-kill fallback when soft interrupt is unavailable or fails. + #[arg(long)] + force: bool, + /// Operator-visible reason retained in local private evidence. + #[arg(long, value_name = "TEXT")] + reason: Option, + /// Emit structured JSON instead of human-readable text. + #[arg(long)] + json: bool, +} + #[derive(Debug, Args)] struct StatusCommand { #[command(flatten)] @@ -1192,6 +1256,8 @@ enum Command { Serve(ServeCommand), /// Manage the local Decodex project registry. Project(ProjectCommand), + /// Inspect or interrupt a local lane. + Lane(LaneCommand), /// Inspect the current local runtime state for one configured project. Status(StatusCommand), /// Write and print the agent-readable local evidence index. @@ -1247,6 +1313,14 @@ enum ProjectSubcommand { Remove(ProjectToggleCommand), } +#[derive(Debug, Subcommand)] +enum LaneSubcommand { + /// Inspect one local lane by issue identifier or tracker issue id. + Inspect(LaneInspectCommand), + /// Soft-interrupt an active app-server turn, with optional hard fallback. + Interrupt(LaneInterruptCommand), +} + #[derive(Debug, Subcommand)] enum RecoverSubcommand { /// Recover retained review lanes whose handoff marker is missing. @@ -1341,7 +1415,8 @@ mod tests { use crate::cli::{ AccountCommand, AccountSubcommand, AccountUseCommand, AttemptCommand, Cli, Command, - CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, ProbeCommand, ProjectCommand, + CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, LaneCommand, + LaneInspectCommand, LaneInterruptCommand, LaneSubcommand, ProbeCommand, ProjectCommand, ProjectConfigArgs, ProjectSubcommand, RadarBackfillReleaseRangeCommand, RadarBundleBuildCommand, RadarBundleCommand, RadarBundleSubcommand, RadarBundleValidateCommand, RadarCommand, RadarLedgerCommand, @@ -1902,6 +1977,70 @@ mod tests { )); } + #[test] + fn parses_lane_inspect_with_run_id_and_project_config() { + let cli = Cli::parse_from([ + "decodex", + "lane", + "--config", + "./project.toml", + "inspect", + "XY-703", + "--run-id", + "xy-703-attempt-1", + "--json", + ]); + + assert!(matches!( + cli.command, + Command::Lane(LaneCommand { + project_config: ProjectConfigArgs { config: Some(config) }, + command: LaneSubcommand::Inspect(LaneInspectCommand { + issue, + run_id: Some(run_id), + json: true, + }) + }) if config == Path::new("./project.toml") + && issue == "XY-703" + && run_id == "xy-703-attempt-1" + )); + } + + #[test] + fn parses_lane_interrupt_with_force_reason_and_project_config() { + let cli = Cli::parse_from([ + "decodex", + "lane", + "--config", + "./project.toml", + "interrupt", + "XY-703", + "--run-id", + "xy-703-attempt-1", + "--force", + "--reason", + "operator requested", + "--json", + ]); + + assert!(matches!( + cli.command, + Command::Lane(LaneCommand { + project_config: ProjectConfigArgs { config: Some(config) }, + command: LaneSubcommand::Interrupt(LaneInterruptCommand { + issue, + run_id, + force: true, + reason: Some(reason), + json: true, + }) + }) if config == Path::new("./project.toml") + && issue == "XY-703" + && run_id == "xy-703-attempt-1" + && reason == "operator requested" + )); + } + #[test] fn parses_diagnose_with_json_limit_and_project_config() { let cli = Cli::parse_from([ diff --git a/apps/decodex/src/lib.rs b/apps/decodex/src/lib.rs index e5c9378..1be764d 100644 --- a/apps/decodex/src/lib.rs +++ b/apps/decodex/src/lib.rs @@ -23,6 +23,7 @@ mod prelude { } mod radar; mod recovery; +mod run_control; mod runtime; mod tracker; mod worktree; diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index ea99266..c11f8cd 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -1,3 +1,9 @@ +mod lane_control; + +pub(crate) use lane_control::{ + LaneInspectRequest, LaneInterruptRequest, interrupt_lane, print_lane_inspect, +}; + #[cfg(unix)] use std::os::fd::AsRawFd; use std::{ cmp::Ordering, @@ -83,6 +89,8 @@ const OPERATOR_LIVE_ENDPOINT_PATH: &str = "/livez"; const OPERATOR_ACCOUNTS_ENDPOINT_PATH: &str = "/api/accounts"; const OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH: &str = "/api/operator-snapshot"; const OPERATOR_LINEAR_SCAN_ENDPOINT_PATH: &str = "/api/linear-scan"; +const OPERATOR_LANE_INSPECT_ENDPOINT_PATH: &str = "/api/lane/inspect"; +const OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH: &str = "/api/lane/interrupt"; const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 8_192; const OPERATOR_DASHBOARD_WS_CLIENT_MESSAGE_MAX_BYTES: usize = 64 * 1_024; const OPERATOR_STATE_HEADER_TERMINATOR: &[u8] = b"\r\n\r\n"; diff --git a/apps/decodex/src/orchestrator/execution.rs b/apps/decodex/src/orchestrator/execution.rs index f66e76b..b1ae163 100644 --- a/apps/decodex/src/orchestrator/execution.rs +++ b/apps/decodex/src/orchestrator/execution.rs @@ -549,6 +549,7 @@ where DecodexToolBridge::new(&tracker_tool_bridge, build_decodex_run_context(workflow, issue_run)); let run_result = agent::execute_app_server_run( &AppServerRunRequest { + project_id: project.service_id().to_owned(), run_id: issue_run.run_id.clone(), issue_id: issue_run.issue.id.clone(), attempt_number: issue_run.attempt_number, diff --git a/apps/decodex/src/orchestrator/lane_control.rs b/apps/decodex/src/orchestrator/lane_control.rs new file mode 100644 index 0000000..fc046de --- /dev/null +++ b/apps/decodex/src/orchestrator/lane_control.rs @@ -0,0 +1,727 @@ +use std::{ + collections::HashSet, + io::Error, + path::{Path, PathBuf}, + process, thread, + time::{Duration, Instant}, +}; + +use libc::{ESRCH, SIGKILL, SIGTERM, c_int, pid_t}; +use serde::Serialize; + +use crate::{ + config::ServiceConfig, + orchestrator::{ + self, ChildRunRef, DEFAULT_STATUS_RUN_LIMIT, OperatorRunStatus, OperatorStatusSnapshot, + }, + prelude::{Result, eyre}, + run_control::{ + self, LaneControlInterruptRequest, LaneControlInterruptRequestInput, + LaneControlInterruptResponse, LaneControlResponseStatus, + }, + runtime, + state::StateStore, +}; + +#[cfg(not(test))] +const LANE_INTERRUPT_RESPONSE_WAIT: Duration = Duration::from_secs(3); +#[cfg(test)] +const LANE_INTERRUPT_RESPONSE_WAIT: Duration = Duration::from_millis(20); +const LANE_HARD_INTERRUPT_TERM_WAIT: Duration = Duration::from_secs(2); + +#[derive(Clone, Copy)] +pub(crate) struct LaneInspectRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) issue: &'a str, + pub(crate) run_id: Option<&'a str>, + pub(crate) json: bool, +} + +#[derive(Clone, Copy)] +pub(crate) struct LaneInterruptRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) issue: &'a str, + pub(crate) run_id: &'a str, + pub(crate) force: bool, + pub(crate) reason: Option<&'a str>, + pub(crate) json: bool, + pub(crate) source: &'a str, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneInspectReport { + project_id: String, + issue: String, + run_id: Option, + matched_run_count: usize, + runs: Vec, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneInterruptReport { + project_id: String, + issue: String, + issue_id: String, + issue_identifier: Option, + run_id: String, + attempt_number: i64, + force: bool, + classification: String, + soft_interrupt: LaneSoftInterruptReport, + hard_interrupt: Option, + next_action: String, +} +impl LaneInterruptReport { + pub(super) fn http_status_line(&self) -> &'static str { + if self.soft_interrupt.status == "pending" && self.hard_interrupt.is_none() { + "202 Accepted" + } else { + "200 OK" + } + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LaneRunInspect { + project_id: String, + issue_id: String, + issue_identifier: Option, + run_id: String, + attempt_number: i64, + status: String, + attempt_status: String, + phase: String, + wait_reason: Option, + current_operation: String, + active_lease: bool, + execution_liveness: String, + thread_id: Option, + turn_id: Option, + thread_status: Option, + process_id: Option, + process_alive: Option, + process_liveness_reason: Option, + last_event_type: Option, + last_event_at: Option, + event_count: i64, + worktree_path: Option, + soft_interrupt_available: bool, + hard_interrupt_available: bool, + hard_interrupt_requires_force: bool, +} +impl LaneRunInspect { + fn from_operator_run(run: &OperatorRunStatus) -> Self { + Self { + project_id: run.project_id.clone(), + issue_id: run.issue_id.clone(), + issue_identifier: run.issue_identifier.clone(), + run_id: run.run_id.clone(), + attempt_number: run.attempt_number, + status: run.status.clone(), + attempt_status: run.attempt_status.clone(), + phase: run.phase.clone(), + wait_reason: run.wait_reason.clone(), + current_operation: run.current_operation.clone(), + active_lease: run.active_lease, + execution_liveness: run.execution_liveness.clone(), + thread_id: run.thread_id.clone(), + turn_id: run.turn_id.clone(), + thread_status: run.thread_status.clone(), + process_id: run.process_id, + process_alive: run.process_alive, + process_liveness_reason: run.process_liveness_reason.clone(), + last_event_type: run.last_event_type.clone(), + last_event_at: run.last_event_at.clone(), + event_count: run.event_count, + worktree_path: run.worktree_path.clone(), + soft_interrupt_available: orchestrator::operator_run_counts_as_active(run) + && run.worktree_path.is_some() + && run.thread_id.is_some() + && run.turn_id.is_some(), + hard_interrupt_available: run.process_id.is_some() && run.process_alive != Some(false), + hard_interrupt_requires_force: true, + } + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LaneSoftInterruptReport { + attempted: bool, + available: bool, + status: String, + classification: String, + method: String, + request_id: Option, + message: String, + error_class: Option, + protocol_summary: Option, + response: Option, +} +impl LaneSoftInterruptReport { + fn unavailable(error_class: &str, message: &str) -> Self { + Self { + attempted: false, + available: false, + status: String::from("unavailable"), + classification: String::from("soft_interrupt_unavailable"), + method: String::from("turn/interrupt"), + request_id: None, + message: message.to_owned(), + error_class: Some(error_class.to_owned()), + protocol_summary: None, + response: None, + } + } + + fn from_response(response: LaneControlInterruptResponse) -> Self { + let status = match response.status { + LaneControlResponseStatus::SoftDelivered => "delivered", + LaneControlResponseStatus::SoftFailed => "failed", + LaneControlResponseStatus::Rejected => "rejected", + }; + + Self { + attempted: true, + available: true, + status: String::from(status), + classification: response.classification.clone(), + method: response.method.clone(), + request_id: Some(response.request_id.clone()), + message: response.message.clone(), + error_class: response.error_class.clone(), + protocol_summary: response.protocol_summary.clone(), + response: Some(response), + } + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LaneHardInterruptReport { + attempted: bool, + status: String, + classification: String, + signals: Vec, + process_id: Option, + process_alive_after: Option, + message: String, + error_class: Option, +} +impl LaneHardInterruptReport { + fn unavailable(error_class: &str, message: &str) -> Self { + Self { + attempted: false, + status: String::from("unavailable"), + classification: String::from("hard_interrupt_fallback"), + signals: Vec::new(), + process_id: None, + process_alive_after: None, + message: message.to_owned(), + error_class: Some(error_class.to_owned()), + } + } +} + +pub(crate) fn print_lane_inspect(request: LaneInspectRequest<'_>) -> Result<()> { + let state_store = runtime::open_runtime_store()?; + let config = load_lane_control_project(request.config_path, &state_store)?; + let report = build_lane_inspect_report(&state_store, &config, request.issue, request.run_id)?; + + if request.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print!("{}", render_lane_inspect_report(&report)); + } + + Ok(()) +} + +pub(crate) fn interrupt_lane(request: LaneInterruptRequest<'_>) -> Result { + let state_store = runtime::open_runtime_store()?; + let config = load_lane_control_project(request.config_path, &state_store)?; + let report = interrupt_lane_with_state( + &state_store, + &config, + request.issue, + request.run_id, + request.force, + request.reason, + request.source, + )?; + + if request.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print!("{}", render_lane_interrupt_report(&report)); + } + + Ok(report) +} + +pub(super) fn build_lane_inspect_report( + state_store: &StateStore, + project: &ServiceConfig, + issue: &str, + run_id: Option<&str>, +) -> Result { + let snapshot = orchestrator::build_operator_status_snapshot( + project, + state_store, + DEFAULT_STATUS_RUN_LIMIT, + )?; + let runs = matching_lane_runs(&snapshot, issue, run_id); + + if runs.is_empty() { + eyre::bail!( + "No local lane matched issue `{}`{} in project `{}`.", + issue, + run_id.map(|id| format!(" and run `{id}`")).unwrap_or_default(), + project.service_id() + ); + } + + let runs = runs.iter().map(LaneRunInspect::from_operator_run).collect::>(); + + Ok(LaneInspectReport { + project_id: project.service_id().to_owned(), + issue: issue.to_owned(), + run_id: run_id.map(str::to_owned), + matched_run_count: runs.len(), + runs, + }) +} + +pub(super) fn interrupt_lane_with_state( + state_store: &StateStore, + project: &ServiceConfig, + issue: &str, + run_id: &str, + force: bool, + reason: Option<&str>, + source: &str, +) -> Result { + let snapshot = orchestrator::build_operator_status_snapshot( + project, + state_store, + DEFAULT_STATUS_RUN_LIMIT, + )?; + let run = select_interrupt_lane_run(&snapshot, issue, run_id)?; + let soft_interrupt = + attempt_soft_lane_interrupt(state_store, project, &run, force, reason, source)?; + let hard_interrupt = if force && soft_interrupt.status != "delivered" { + Some(attempt_hard_lane_interrupt(state_store, &run, reason)?) + } else { + None + }; + let classification = hard_interrupt + .as_ref() + .map(|hard| hard.classification.clone()) + .unwrap_or_else(|| soft_interrupt.classification.clone()); + let next_action = lane_interrupt_next_action(&soft_interrupt, hard_interrupt.as_ref(), force); + + Ok(LaneInterruptReport { + project_id: project.service_id().to_owned(), + issue: issue.to_owned(), + issue_id: run.issue_id.clone(), + issue_identifier: run.issue_identifier.clone(), + run_id: run.run_id.clone(), + attempt_number: run.attempt_number, + force, + classification, + soft_interrupt, + hard_interrupt, + next_action, + }) +} + +fn load_lane_control_project( + config_path: Option<&Path>, + state_store: &StateStore, +) -> Result { + let Some(config_path) = orchestrator::resolve_config_path(config_path, state_store)? else { + eyre::bail!( + "No Decodex project config found. Pass --config or register one with `decodex project add `." + ); + }; + + runtime::register_project_config(state_store, &config_path, true)?; + + ServiceConfig::from_path(&config_path) +} + +fn select_interrupt_lane_run( + snapshot: &OperatorStatusSnapshot, + issue: &str, + run_id: &str, +) -> Result { + let runs = matching_lane_runs(snapshot, issue, Some(run_id)); + + if runs.is_empty() { + eyre::bail!( + "No local lane matched issue `{issue}` and run `{run_id}` in project `{}`.", + snapshot.project_id + ); + } + + Ok(runs[0].clone()) +} + +fn matching_lane_runs( + snapshot: &OperatorStatusSnapshot, + issue: &str, + run_id: Option<&str>, +) -> Vec { + let mut seen_run_ids = HashSet::new(); + let mut runs = Vec::new(); + + for run in snapshot.active_runs.iter().chain(snapshot.recent_runs.iter()) { + if !seen_run_ids.insert(run.run_id.clone()) { + continue; + } + if !lane_issue_matches(run, issue) { + continue; + } + if run_id.is_some_and(|expected| expected != run.run_id) { + continue; + } + + runs.push(run.clone()); + } + + runs +} + +fn lane_issue_matches(run: &OperatorRunStatus, issue: &str) -> bool { + let issue = issue.trim(); + + run.issue_id == issue + || run.issue_identifier.as_deref() == Some(issue) + || run + .issue_identifier + .as_ref() + .is_some_and(|identifier| identifier.eq_ignore_ascii_case(issue)) +} + +fn attempt_soft_lane_interrupt( + state_store: &StateStore, + project: &ServiceConfig, + run: &OperatorRunStatus, + force: bool, + reason: Option<&str>, + source: &str, +) -> Result { + let Some(worktree_path) = absolute_lane_worktree_path(project, state_store, run)? else { + return Ok(LaneSoftInterruptReport::unavailable( + "worktree_missing", + "Soft interrupt requires the active lane worktree and run-control directory.", + )); + }; + let Some(thread_id) = run.thread_id.as_deref() else { + return Ok(LaneSoftInterruptReport::unavailable( + "thread_id_missing", + "Soft interrupt requires a recorded app-server thread id.", + )); + }; + let Some(turn_id) = run.turn_id.as_deref() else { + return Ok(LaneSoftInterruptReport::unavailable( + "turn_id_missing", + "Soft interrupt requires a recorded active app-server turn id.", + )); + }; + + if !orchestrator::operator_run_counts_as_active(run) { + return Ok(LaneSoftInterruptReport::unavailable( + "lane_not_active", + "Soft interrupt only targets active or live local lane runs.", + )); + } + + let request = LaneControlInterruptRequest::new(LaneControlInterruptRequestInput { + project_id: project.service_id(), + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + thread_id, + turn_id, + source, + reason, + }); + + run_control::write_interrupt_request(&worktree_path, &request)?; + + state_store.append_private_execution_event( + project.service_id(), + &run.issue_id, + &run.run_id, + run.attempt_number, + "lane_control/interrupt/requested", + serde_json::json!({ + "requestId": request.request_id, + "source": source, + "method": "turn/interrupt", + "force": force, + "reason": reason, + }), + )?; + + match run_control::wait_for_interrupt_response( + &worktree_path, + &run.run_id, + &request.request_id, + LANE_INTERRUPT_RESPONSE_WAIT, + )? { + Some(response) => Ok(LaneSoftInterruptReport::from_response(response)), + None => Ok(LaneSoftInterruptReport { + attempted: true, + available: true, + status: String::from("pending"), + classification: String::from("soft_interrupt_pending"), + method: String::from("turn/interrupt"), + request_id: Some(request.request_id), + message: String::from( + "Soft interrupt request was written, but the app-server child has not recorded a response yet.", + ), + error_class: Some(String::from("soft_interrupt_response_pending")), + protocol_summary: None, + response: None, + }), + } +} + +fn attempt_hard_lane_interrupt( + state_store: &StateStore, + run: &OperatorRunStatus, + reason: Option<&str>, +) -> Result { + let Some(process_id) = run.process_id else { + return Ok(LaneHardInterruptReport::unavailable( + "process_id_missing", + "Hard interrupt fallback requires a recorded child process id.", + )); + }; + + if process_id == process::id() { + eyre::bail!("Refusing to hard-interrupt the current Decodex process."); + } + + let mut signals = Vec::new(); + let mut sent_any_signal = false; + + if send_lane_signal(process_id, SIGTERM)? { + sent_any_signal = true; + + signals.push(String::from("SIGTERM")); + } + if sent_any_signal + && !wait_for_lane_process_exit(process_id, LANE_HARD_INTERRUPT_TERM_WAIT) + && send_lane_signal(process_id, SIGKILL)? + { + signals.push(String::from("SIGKILL")); + } + + let process_alive_after = Some(orchestrator::process_is_alive(process_id)); + let status = if process_alive_after == Some(false) { + "sent" + } else if sent_any_signal { + "still_alive" + } else { + "process_not_found" + }; + let message = if sent_any_signal { + String::from("Hard interrupt fallback signaled the recorded child process.") + } else { + String::from("Hard interrupt fallback found no signalable child process.") + }; + + state_store.append_private_execution_event( + &run.project_id, + &run.issue_id, + &run.run_id, + run.attempt_number, + "lane_control/interrupt", + serde_json::json!({ + "classification": "hard_interrupt_fallback", + "status": status, + "signals": signals, + "processId": process_id, + "processAliveAfter": process_alive_after, + "reason": reason, + }), + )?; + + if sent_any_signal { + orchestrator::clear_orphaned_daemon_child_state( + state_store, + ChildRunRef { + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + }, + true, + )?; + } + + Ok(LaneHardInterruptReport { + attempted: true, + status: String::from(status), + classification: String::from("hard_interrupt_fallback"), + signals, + process_id: Some(process_id), + process_alive_after, + message, + error_class: None, + }) +} + +fn absolute_lane_worktree_path( + project: &ServiceConfig, + state_store: &StateStore, + run: &OperatorRunStatus, +) -> Result> { + if let Some(mapping) = state_store.worktree_for_issue(&run.issue_id)? { + return Ok(Some(mapping.worktree_path().to_path_buf())); + } + + let Some(worktree_path) = run.worktree_path.as_deref() else { + return Ok(None); + }; + let worktree_path = Path::new(worktree_path); + + Ok(Some(if worktree_path.is_absolute() { + worktree_path.to_path_buf() + } else { + project.repo_root().join(worktree_path) + })) +} + +fn send_lane_signal(process_id: u32, signal: c_int) -> Result { + let process_id = pid_t::try_from(process_id).map_err(|_error| { + eyre::eyre!("Recorded child process id is too large for this platform.") + })?; + + if process_id <= 0 { + eyre::bail!("Recorded child process id must be positive."); + } + + match unsafe { libc::kill(process_id, signal) } { + 0 => Ok(true), + -1 if Error::last_os_error().raw_os_error() == Some(ESRCH) => Ok(false), + -1 => Err(Error::last_os_error().into()), + _ => Ok(false), + } +} + +fn wait_for_lane_process_exit(process_id: u32, timeout: Duration) -> bool { + let started_at = Instant::now(); + + while started_at.elapsed() < timeout { + if !orchestrator::process_is_alive(process_id) { + return true; + } + + thread::sleep(Duration::from_millis(100)); + } + + !orchestrator::process_is_alive(process_id) +} + +fn lane_interrupt_next_action( + soft: &LaneSoftInterruptReport, + hard: Option<&LaneHardInterruptReport>, + force: bool, +) -> String { + if let Some(hard) = hard { + return if hard.status == "unavailable" { + String::from("Hard fallback was unavailable; inspect the lane before retrying.") + } else if hard.status == "sent" || hard.status == "process_not_found" { + String::from( + "Inspect the lane to confirm the lease and dirty-worktree reconciliation state.", + ) + } else { + String::from( + "The fallback signal did not stop the recorded process; inspect the host process before retrying.", + ) + }; + } + + match soft.status.as_str() { + "delivered" => + String::from("Inspect the lane until the app-server turn records completion."), + "pending" => + if force { + String::from("Soft interrupt is pending; forced fallback was not attempted.") + } else { + String::from( + "Re-run inspect shortly, or retry interrupt with --force if operator intent is to kill the process.", + ) + }, + "failed" | "rejected" | "unavailable" => String::from( + "Retry with --force only if operator intent is to use hard process-kill fallback.", + ), + _ => String::from("Inspect the lane for the latest run status."), + } +} + +fn render_lane_inspect_report(report: &LaneInspectReport) -> String { + let mut output = format!( + "Lane inspect for {} in project {} ({} run{})\n", + report.issue, + report.project_id, + report.matched_run_count, + if report.matched_run_count == 1 { "" } else { "s" } + ); + + for run in &report.runs { + output.push_str(&format!( + "- {} attempt {}: status={}, phase={}, activeLease={}, liveness={}\n", + run.run_id, + run.attempt_number, + run.status, + run.phase, + run.active_lease, + run.execution_liveness + )); + output.push_str(&format!( + " appServer: thread={}, turn={}, softInterruptAvailable={}\n", + run.thread_id.as_deref().unwrap_or("none"), + run.turn_id.as_deref().unwrap_or("none"), + run.soft_interrupt_available + )); + output.push_str(&format!( + " process: pid={}, alive={}, hardInterruptAvailable={} (requires --force)\n", + run.process_id.map_or_else(|| String::from("none"), |id| id.to_string()), + run.process_alive.map_or_else(|| String::from("unknown"), |alive| alive.to_string()), + run.hard_interrupt_available + )); + } + + output +} + +fn render_lane_interrupt_report(report: &LaneInterruptReport) -> String { + let mut output = format!( + "Lane interrupt {} for run {}: {}\n", + report.classification, report.run_id, report.soft_interrupt.message + ); + + if let Some(hard) = &report.hard_interrupt { + output.push_str(&format!( + "Hard fallback {}: {} ({})\n", + hard.status, + hard.message, + if hard.signals.is_empty() { + String::from("no signals") + } else { + hard.signals.join(",") + } + )); + } + + output.push_str(&format!("Next action: {}\n", report.next_action)); + + output +} diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 267eb2e..e3c0d01 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -27,6 +27,8 @@ enum OperatorRequestRoute { Live, AppSnapshot, LinearScan, + LaneInspect, + LaneInterrupt, AccountList { force_refresh: bool }, AccountSelect, AccountClear, @@ -132,6 +134,17 @@ struct OperatorLinearScanHttpRequest { project_id: Option, } +#[derive(Deserialize)] +struct OperatorLaneInterruptHttpRequest { + #[serde(alias = "projectId")] + project_id: Option, + issue: String, + #[serde(alias = "runId")] + run_id: String, + force: Option, + reason: Option, +} + struct DashboardControlAck<'a> { request_id: Option<&'a str>, action: &'a str, @@ -228,6 +241,20 @@ fn handle_operator_state_endpoint_connection( return Ok(()); } + if route == OperatorRequestRoute::LaneInspect { + let response = build_operator_lane_inspect_http_response(state_store, &request); + + stream.write_all(&response)?; + + return Ok(()); + } + if route == OperatorRequestRoute::LaneInterrupt { + let response = build_operator_lane_interrupt_http_response(state_store, &request); + + stream.write_all(&response)?; + + return Ok(()); + } if route == OperatorRequestRoute::AppSnapshot { let response = build_operator_app_snapshot_http_response(snapshot); @@ -1417,6 +1444,179 @@ fn operator_linear_scan_request_project_id(request: &[u8]) -> Result Vec { + match operator_lane_inspect_http_response_body(state_store, request) { + Ok(body) => http_response_bytes("200 OK", "application/json", &body), + Err(error) => operator_lane_error_http_response(error), + } +} + +fn operator_lane_inspect_http_response_body( + state_store: &StateStore, + request: &[u8], +) -> Result> { + let project_id = operator_http_query_value_alias(request, "projectId", "project_id")?; + let issue = operator_http_query_value(request, "issue")? + .filter(|issue| !issue.trim().is_empty()) + .ok_or_else(|| eyre::eyre!("Lane inspect request requires issue query parameter."))?; + let run_id = operator_http_query_value_alias(request, "runId", "run_id")?; + let project = operator_lane_http_project(state_store, project_id.as_deref())?; + let report = lane_control::build_lane_inspect_report( + state_store, + &project, + issue.trim(), + run_id.as_deref(), + )?; + + serde_json::to_vec(&report).map_err(Into::into) +} + +fn build_operator_lane_interrupt_http_response( + state_store: &StateStore, + request: &[u8], +) -> Vec { + match operator_lane_interrupt_http_response_body(state_store, request) { + Ok((status_line, body)) => http_response_bytes(status_line, "application/json", &body), + Err(error) => operator_lane_error_http_response(error), + } +} + +fn operator_lane_interrupt_http_response_body( + state_store: &StateStore, + request: &[u8], +) -> Result<(&'static str, Vec)> { + let body = operator_http_request_body(request)?; + let request: OperatorLaneInterruptHttpRequest = serde_json::from_slice(body) + .map_err(|error| eyre::eyre!("Lane interrupt request body was not valid JSON: {error}"))?; + + if request.issue.trim().is_empty() { + eyre::bail!("Lane interrupt request issue must not be blank."); + } + if request.run_id.trim().is_empty() { + eyre::bail!("Lane interrupt request runId must not be blank."); + } + + let project = operator_lane_http_project(state_store, request.project_id.as_deref())?; + let report = lane_control::interrupt_lane_with_state( + state_store, + &project, + request.issue.trim(), + request.run_id.trim(), + request.force.unwrap_or(false), + request.reason.as_deref(), + "http", + )?; + let status_line = report.http_status_line(); + + Ok((status_line, serde_json::to_vec(&report)?)) +} + +fn operator_lane_http_project( + state_store: &StateStore, + project_id: Option<&str>, +) -> Result { + let registrations = state_store.list_projects()?; + let registration = match project_id.map(str::trim).filter(|id| !id.is_empty()) { + Some(project_id) => registrations + .iter() + .find(|registration| registration.service_id() == project_id) + .ok_or_else(|| eyre::eyre!("Decodex project `{project_id}` is not registered."))?, + None => { + let enabled = registrations + .iter() + .filter(|registration| registration.enabled()) + .collect::>(); + + if enabled.len() == 1 { + enabled[0] + } else { + eyre::bail!( + "Lane API request requires projectId when zero or multiple projects are enabled." + ); + } + }, + }; + + ServiceConfig::from_path(registration.config_path()) +} + +fn operator_lane_error_http_response(error: Report) -> Vec { + let body = serde_json::to_vec(&json!({ "error": error.to_string() })) + .unwrap_or_else(|_| br#"{"error":"lane request failed"}"#.to_vec()); + + http_response_bytes("400 Bad Request", "application/json", &body) +} + +fn operator_http_query_value(request: &[u8], key: &str) -> Result> { + let request = String::from_utf8_lossy(request); + let Some(request_line) = request.lines().next() else { + return Ok(None); + }; + let Some(path) = request_line.split_whitespace().nth(1) else { + return Ok(None); + }; + let Some(query) = path.split_once('?').map(|(_path, query)| query) else { + return Ok(None); + }; + + for part in query.split('&') { + let (name, value) = part.split_once('=').unwrap_or((part, "")); + + if name == key { + return Ok(Some(percent_decode_operator_query_value(value)?)); + } + } + + Ok(None) +} + +fn operator_http_query_value_alias( + request: &[u8], + primary: &str, + secondary: &str, +) -> Result> { + match operator_http_query_value(request, primary)? { + Some(value) => Ok(Some(value)), + None => operator_http_query_value(request, secondary), + } +} + +fn percent_decode_operator_query_value(value: &str) -> Result { + let raw = value.as_bytes(); + let mut bytes = Vec::with_capacity(value.len()); + let mut index = 0; + + while index < raw.len() { + match raw[index] { + b'+' => { + bytes.push(b' '); + + index += 1; + }, + b'%' if index + 2 < raw.len() => { + let hex = std::str::from_utf8(&raw[index + 1..index + 3])?; + let byte = u8::from_str_radix(hex, 16) + .map_err(|error| eyre::eyre!("Invalid percent-encoded query value: {error}"))?; + + bytes.push(byte); + + index += 3; + }, + byte => { + bytes.push(byte); + + index += 1; + }, + } + } + + String::from_utf8(bytes) + .map_err(|error| eyre::eyre!("Query parameter was not valid UTF-8: {error}")) +} + fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> Vec { match route { OperatorRequestRoute::Dashboard => { @@ -1438,6 +1638,9 @@ fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> OperatorRequestRoute::LinearScan => { http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") }, + OperatorRequestRoute::LaneInspect | OperatorRequestRoute::LaneInterrupt => { + http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") + }, OperatorRequestRoute::Live => { http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ok") }, @@ -1500,6 +1703,8 @@ fn parse_operator_state_request_route( force_refresh: operator_query_has_flag(query, "refresh"), }), ("POST", OPERATOR_LINEAR_SCAN_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LinearScan), + ("GET", OPERATOR_LANE_INSPECT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInspect), + ("POST", OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInterrupt), ("POST", "/api/accounts/select") => Ok(OperatorRequestRoute::AccountSelect), ("POST", "/api/accounts/clear") => Ok(OperatorRequestRoute::AccountClear), ("POST", "/api/accounts/logout") => Ok(OperatorRequestRoute::AccountLogout), @@ -1512,6 +1717,8 @@ fn parse_operator_state_request_route( | OPERATOR_LIVE_ENDPOINT_PATH | OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH | OPERATOR_LINEAR_SCAN_ENDPOINT_PATH + | OPERATOR_LANE_INSPECT_ENDPOINT_PATH + | OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH | OPERATOR_ACCOUNTS_ENDPOINT_PATH | "/api/accounts/select" | "/api/accounts/clear" diff --git a/apps/decodex/src/orchestrator/tests/operator/status/http.rs b/apps/decodex/src/orchestrator/tests/operator/status/http.rs index 919826f..bed8cb9 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/http.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/http.rs @@ -1389,6 +1389,167 @@ fn operator_state_endpoint_queues_linear_scan_request() { ); } +#[test] +fn operator_lane_inspect_api_returns_lane_identity() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + + fs::create_dir_all(&worktree_path).expect("worktree should exist"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + let response = String::from_utf8(orchestrator::build_operator_lane_inspect_http_response( + &state_store, + format!( + "GET {}?projectId=pubfi&issue=PUB-101&runId=pub-101-attempt-1 HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", + orchestrator::OPERATOR_LANE_INSPECT_ENDPOINT_PATH + ) + .as_bytes(), + )) + .expect("lane inspect response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane inspect response should include body"); + let data: Value = serde_json::from_str(body).expect("lane inspect response should be json"); + + assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); + assert_eq!(data["projectId"], "pubfi"); + assert_eq!(data["issue"], "PUB-101"); + assert_eq!(data["matchedRunCount"], 1); + assert_eq!(data["runs"][0]["runId"], "pub-101-attempt-1"); + assert_eq!(data["runs"][0]["threadId"], "thread-1"); + assert_eq!(data["runs"][0]["turnId"], "turn-1"); +} + +#[test] +fn operator_lane_interrupt_api_rejects_blank_run_id() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let body = br#"{"projectId":"pubfi","issue":"PUB-101","runId":""}"#; + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH, + body.len(), + String::from_utf8_lossy(body) + ); + + state_store.upsert_project(®istration).expect("project should register"); + + let response = String::from_utf8(orchestrator::build_operator_lane_interrupt_http_response( + &state_store, + request.as_bytes(), + )) + .expect("lane interrupt response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane interrupt response should include body"); + let data: Value = serde_json::from_str(body).expect("lane interrupt response should be json"); + + assert!(response.starts_with("HTTP/1.1 400 Bad Request\r\n")); + assert!(data["error"].as_str().unwrap_or_default().contains("runId")); +} + +#[test] +fn operator_lane_interrupt_api_force_reports_hard_fallback_after_pending_soft_interrupt() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + let body = br#"{"projectId":"pubfi","issue":"PUB-101","runId":"pub-101-attempt-1","force":true}"#; + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH, + body.len(), + String::from_utf8_lossy(body) + ); + + fs::create_dir_all(&worktree_path).expect("worktree should exist"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + let response = String::from_utf8(orchestrator::build_operator_lane_interrupt_http_response( + &state_store, + request.as_bytes(), + )) + .expect("lane interrupt response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane interrupt response should include body"); + let data: Value = serde_json::from_str(body).expect("lane interrupt response should be json"); + + assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); + assert_eq!(data["classification"], "hard_interrupt_fallback"); + assert_eq!(data["softInterrupt"]["status"], "pending"); + assert_eq!(data["hardInterrupt"]["status"], "unavailable"); + assert!(data["nextAction"].as_str().unwrap_or_default().contains("Hard fallback was unavailable")); +} + #[test] fn operator_state_endpoint_serves_account_api_snapshot() { let temp_dir = TempDir::new().expect("temp dir should exist"); diff --git a/apps/decodex/src/run_control.rs b/apps/decodex/src/run_control.rs new file mode 100644 index 0000000..ee50b7b --- /dev/null +++ b/apps/decodex/src/run_control.rs @@ -0,0 +1,351 @@ +use std::{ + fs, + io::ErrorKind, + path::{Path, PathBuf}, + process, thread, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use time::OffsetDateTime; + +use crate::prelude::{self, eyre}; + +const RUN_CONTROL_DIR: &str = ".decodex-run-control"; +const REQUEST_SUFFIX: &str = ".request.json"; +const RESPONSE_SUFFIX: &str = ".response.json"; +const SCHEMA_INTERRUPT_REQUEST: &str = "decodex/run-control/interrupt-request/1"; +const SCHEMA_INTERRUPT_RESPONSE: &str = "decodex/run-control/interrupt-response/1"; +const POLL_INTERVAL: Duration = Duration::from_millis(100); + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneControlInterruptRequest { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) turn_id: String, + pub(crate) source: String, + pub(crate) reason: Option, + pub(crate) created_at_unix_epoch: i64, +} +impl LaneControlInterruptRequest { + pub(crate) fn new(input: LaneControlInterruptRequestInput<'_>) -> Self { + Self { + schema: String::from(SCHEMA_INTERRUPT_REQUEST), + request_id: fresh_request_id(input.run_id), + project_id: input.project_id.to_owned(), + issue_id: input.issue_id.to_owned(), + run_id: input.run_id.to_owned(), + attempt_number: input.attempt_number, + thread_id: input.thread_id.to_owned(), + turn_id: input.turn_id.to_owned(), + source: input.source.to_owned(), + reason: input.reason.map(str::to_owned), + created_at_unix_epoch: OffsetDateTime::now_utc().unix_timestamp(), + } + } +} + +pub(crate) struct LaneControlInterruptRequestInput<'a> { + pub(crate) project_id: &'a str, + pub(crate) issue_id: &'a str, + pub(crate) run_id: &'a str, + pub(crate) attempt_number: i64, + pub(crate) thread_id: &'a str, + pub(crate) turn_id: &'a str, + pub(crate) source: &'a str, + pub(crate) reason: Option<&'a str>, +} + +#[derive(Clone, Debug)] +pub(crate) struct PendingLaneControlRequest { + pub(crate) path: PathBuf, + pub(crate) request: LaneControlInterruptRequest, +} + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum LaneControlResponseStatus { + SoftDelivered, + SoftFailed, + Rejected, +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneControlInterruptResponse { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) turn_id: String, + pub(crate) status: LaneControlResponseStatus, + pub(crate) classification: String, + pub(crate) method: String, + pub(crate) message: String, + pub(crate) error_class: Option, + pub(crate) protocol_summary: Option, + pub(crate) recorded_at_unix_epoch: i64, +} +impl LaneControlInterruptResponse { + pub(crate) fn delivered( + request: &LaneControlInterruptRequest, + protocol_summary: String, + ) -> Self { + Self::from_request( + request, + LaneControlResponseStatus::SoftDelivered, + "graceful_stop_requested", + "turn/interrupt accepted by app-server.", + None, + Some(protocol_summary), + ) + } + + pub(crate) fn failed( + request: &LaneControlInterruptRequest, + error_class: &str, + message: String, + ) -> Self { + Self::from_request( + request, + LaneControlResponseStatus::SoftFailed, + "soft_interrupt_failed", + message, + Some(error_class.to_owned()), + None, + ) + } + + pub(crate) fn rejected( + request: &LaneControlInterruptRequest, + error_class: &str, + message: String, + ) -> Self { + Self::from_request( + request, + LaneControlResponseStatus::Rejected, + "control_request_rejected", + message, + Some(error_class.to_owned()), + None, + ) + } + + fn from_request( + request: &LaneControlInterruptRequest, + status: LaneControlResponseStatus, + classification: &str, + message: impl Into, + error_class: Option, + protocol_summary: Option, + ) -> Self { + Self { + schema: String::from(SCHEMA_INTERRUPT_RESPONSE), + request_id: request.request_id.clone(), + project_id: request.project_id.clone(), + issue_id: request.issue_id.clone(), + run_id: request.run_id.clone(), + attempt_number: request.attempt_number, + thread_id: request.thread_id.clone(), + turn_id: request.turn_id.clone(), + status, + classification: classification.to_owned(), + method: String::from("turn/interrupt"), + message: message.into(), + error_class, + protocol_summary, + recorded_at_unix_epoch: OffsetDateTime::now_utc().unix_timestamp(), + } + } +} + +pub(crate) fn write_interrupt_request( + worktree_path: &Path, + request: &LaneControlInterruptRequest, +) -> prelude::Result { + let path = interrupt_request_path(worktree_path, &request.run_id, &request.request_id); + + write_json_file_atomically(&path, request)?; + + Ok(path) +} + +pub(crate) fn write_interrupt_response( + worktree_path: &Path, + response: &LaneControlInterruptResponse, +) -> prelude::Result { + let path = interrupt_response_path(worktree_path, &response.run_id, &response.request_id); + + write_json_file_atomically(&path, response)?; + + Ok(path) +} + +pub(crate) fn remove_interrupt_request(path: &Path) -> prelude::Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(()), + Err(error) => Err(error.into()), + } +} + +pub(crate) fn pending_interrupt_requests( + worktree_path: &Path, + run_id: &str, +) -> prelude::Result> { + let dir = run_control_run_dir(worktree_path, run_id); + let Ok(entries) = fs::read_dir(&dir) else { + return Ok(Vec::new()); + }; + let mut requests = entries + .filter_map(std::result::Result::ok) + .map(|entry| entry.path()) + .filter(|path| file_name_ends_with(path, REQUEST_SUFFIX)) + .map(read_pending_interrupt_request) + .collect::>>()?; + + requests.sort_by(|left, right| { + left.request + .created_at_unix_epoch + .cmp(&right.request.created_at_unix_epoch) + .then_with(|| left.request.request_id.cmp(&right.request.request_id)) + }); + + Ok(requests) +} + +pub(crate) fn wait_for_interrupt_response( + worktree_path: &Path, + run_id: &str, + request_id: &str, + timeout: Duration, +) -> prelude::Result> { + let started_at = Instant::now(); + + loop { + if let Some(response) = read_interrupt_response(worktree_path, run_id, request_id)? { + return Ok(Some(response)); + } + + if started_at.elapsed() >= timeout { + return Ok(None); + } + + thread::sleep(POLL_INTERVAL); + } +} + +pub(crate) fn read_interrupt_response( + worktree_path: &Path, + run_id: &str, + request_id: &str, +) -> prelude::Result> { + let path = interrupt_response_path(worktree_path, run_id, request_id); + + match fs::read_to_string(path) { + Ok(raw) => serde_json::from_str(&raw).map(Some).map_err(Into::into), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +pub(crate) fn protocol_response_summary(value: &Value) -> String { + match value { + Value::Null => String::from("null"), + Value::Bool(_) => String::from("boolean"), + Value::Number(_) => String::from("number"), + Value::String(_) => String::from("string"), + Value::Array(values) => format!("array(len={})", values.len()), + Value::Object(entries) => { + let mut keys = entries.keys().map(String::as_str).collect::>(); + + keys.sort_unstable(); + + format!("object(keys={})", keys.join(",")) + }, + } +} + +fn read_pending_interrupt_request(path: PathBuf) -> prelude::Result { + let raw = fs::read_to_string(&path)?; + let request: LaneControlInterruptRequest = serde_json::from_str(&raw)?; + + if request.schema != SCHEMA_INTERRUPT_REQUEST { + eyre::bail!( + "Unsupported lane-control request schema `{}` in `{}`.", + request.schema, + path.display() + ); + } + + Ok(PendingLaneControlRequest { path, request }) +} + +fn interrupt_request_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { + run_control_run_dir(worktree_path, run_id).join(format!( + "{}{}", + sanitize_path_component(request_id), + REQUEST_SUFFIX + )) +} + +fn interrupt_response_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { + run_control_run_dir(worktree_path, run_id).join(format!( + "{}{}", + sanitize_path_component(request_id), + RESPONSE_SUFFIX + )) +} + +fn run_control_run_dir(worktree_path: &Path, run_id: &str) -> PathBuf { + worktree_path.join(RUN_CONTROL_DIR).join(sanitize_path_component(run_id)) +} + +fn write_json_file_atomically(path: &Path, value: &T) -> prelude::Result<()> +where + T: Serialize, +{ + let parent = path + .parent() + .ok_or_else(|| eyre::eyre!("Lane-control file `{}` has no parent.", path.display()))?; + let temp_path = path.with_extension("tmp"); + let data = serde_json::to_vec_pretty(value)?; + + fs::create_dir_all(parent)?; + fs::write(&temp_path, data)?; + fs::rename(&temp_path, path)?; + + Ok(()) +} + +fn fresh_request_id(run_id: &str) -> String { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos(); + + format!("{}-{}-{now}", sanitize_path_component(run_id), process::id()) +} + +fn file_name_ends_with(path: &Path, suffix: &str) -> bool { + path.file_name().and_then(|name| name.to_str()).is_some_and(|name| name.ends_with(suffix)) +} + +fn sanitize_path_component(value: &str) -> String { + let sanitized = value + .chars() + .map(|character| match character { + 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' => character, + _ => '-', + }) + .collect::(); + + if sanitized.is_empty() { String::from("lane-control") } else { sanitized } +} diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index f681e4f..65fd78e 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -38,8 +38,9 @@ launch it connects to an existing default local listener when one is reachable; not, it starts the bundled `decodex` binary as `decodex serve --listen-address 127.0.0.1:8912`. The app fallback is a normal control-plane server: it loads the enabled project registry, uses the CLI-owned default -cadences, and serves the dashboard, account APIs, `GET /api/operator-snapshot`, and -`POST /api/linear-scan` from the single local listener. +cadences, and serves the dashboard, account APIs, `GET /api/operator-snapshot`, +`POST /api/linear-scan`, `GET /api/lane/inspect`, and `POST /api/lane/interrupt` from +the single local listener. `decodex serve` has two hardcoded scheduler cadences: @@ -61,6 +62,21 @@ An empty `POST /api/linear-scan` queues a scan for all enabled projects. Request consumed by the next 15-second control-plane tick and still respect any active tracker rate-limit backoff. +Lane inspect and interrupt are local control APIs, not dashboard UI actions: + +```sh +curl -sS 'http://127.0.0.1:8912/api/lane/inspect?projectId=decodex&issue=XY-703' +curl -sS -X POST http://127.0.0.1:8912/api/lane/interrupt \ + -H 'Content-Type: application/json' \ + -d '{"projectId":"decodex","issue":"XY-703","runId":""}' +``` + +`POST /api/lane/interrupt` first writes a soft interrupt request for the active +app-server child to deliver with `turn/interrupt`. Add `"force": true` only when the +operator explicitly wants hard process-kill fallback after soft interrupt is +unavailable or does not return in the local wait window. Hard fallback is reported as +`hard_interrupt_fallback`, not as a graceful stop. + Use `--dev` only for isolated local development: - Developers may use `--dev` to exercise real account APIs, `GET /api/operator-snapshot`, @@ -159,7 +175,8 @@ steer, retry, task replacement, or lifecycle mutations. CLI/API is the first operator-control surface for lane control, governed by [`../spec/lane-control.md`](../spec/lane-control.md). The browser UI does not show or accept active-lane stop/interrupt controls, project pause/resume controls, manual retry -controls, or active-lane steer controls. Account-pool selection remains available +controls, or active-lane steer controls; use `decodex lane inspect`, `decodex lane +interrupt`, or the local `/api/lane/*` endpoints instead. Account-pool selection remains available because it changes the global Codex account selector, not an active lane. `runActivity.activeRunsComplete` marks whether a payload is the complete active-run list; subscription-filtered diff --git a/docs/spec/app-server.md b/docs/spec/app-server.md index 2e78dcc..3e474d6 100644 --- a/docs/spec/app-server.md +++ b/docs/spec/app-server.md @@ -152,6 +152,8 @@ If generated schema or live capability probing shows that `turn/interrupt` or `turn/steer` is unavailable, the CLI/API control must report that control as unsupported for the active lane instead of failing ordinary issue dispatch. The lane-control contract and support matrix live in [`lane-control.md`](./lane-control.md). +Decodex currently implements `turn/interrupt` through the child-owned app-server +connection for active turns; `turn/steer` remains planned. ## Required request flow @@ -317,6 +319,25 @@ Decodex's intended use is soft active-turn interruption from a CLI/API operator control. It should target the current known thread and turn, request a graceful turn stop through app-server, and leave outcome classification to the Decodex runtime. +Request parameters: + +```json +{ + "threadId": "", + "turnId": "" +} +``` + +Decodex treats the app-server response as protocol evidence rather than a private +payload to expose. The local control response records a summary such as object keys or +array length, plus a normalized result: + +- `soft_delivered` when app-server accepts the JSON-RPC request +- `soft_failed` when the method is unsupported, times out, or returns another protocol + error +- `rejected` when the child process finds that the requested project, issue, run, + attempt, thread, or turn no longer matches the active turn + `turn/interrupt` must not: - mutate tracker state directly @@ -324,9 +345,9 @@ stop through app-server, and leave outcome classification to the Decodex runtime - clear leases without runtime classification - replace the hard-interrupt process fallback when app-server is unreachable -When implemented, Decodex should prefer `turn/interrupt` before signaling the child -process. A hard interrupt remains only a fallback after soft interrupt is unavailable, -times out, or cannot be routed to the live app-server session. +Decodex prefers `turn/interrupt` before signaling the child process. A hard interrupt +remains only a fallback after explicit operator intent and after soft interrupt is +unavailable, times out, or cannot be routed to the live app-server session. ## `turn/steer` diff --git a/docs/spec/lane-control.md b/docs/spec/lane-control.md index 5a86561..e67a13c 100644 --- a/docs/spec/lane-control.md +++ b/docs/spec/lane-control.md @@ -33,12 +33,12 @@ agent-facing skills must guide responsible use. | Capability | Contract status | Current implementation evidence | Required behavior | | --- | --- | --- | --- | -| Inspect lane state | Supported | `decodex status`, `decodex status --json`, `decodex diagnose --json`, `decodex evidence `, operator snapshots, and dashboard views | Always inspect before mutating or steering. Inspection must not mutate tracker state, runtime DB rows, worktrees, or app-server turns. | +| Inspect lane state | Supported | `decodex lane inspect `, `decodex status`, `decodex status --json`, `decodex diagnose --json`, `decodex evidence `, `GET /api/lane/inspect`, operator snapshots, and dashboard views | Always inspect before mutating or steering. Inspection must not mutate tracker state, runtime DB rows, worktrees, or app-server turns. | | Project dispatch pause | Supported for future dispatch | `decodex project disable ` and the runtime project enabled flag | Pause prevents new dispatch for the project. It must not kill or rewrite already active lanes. | | Project dispatch resume | Supported for future dispatch | `decodex project enable ` and the runtime project enabled flag | Resume re-enables future dispatch after the operator has inspected blockers, capacity, and queue state. | | Linear scan request | Supported | `POST /api/linear-scan` with optional `projectId` | Queue a scan for the next control-plane tick while respecting tracker backoff. This is an intake/status refresh request, not an execution command. | -| Soft interrupt | Planned CLI/API control; bottom-layer method allowed | Decodex does not currently send `turn/interrupt` from its app-server client | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and must leave classification to the runtime. | -| Hard interrupt fallback | Emergency fallback only | No dashboard or CLI/API lane-control path exposes hard interrupt in this rollout; runtime recovery can still classify attempts as `interrupted` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | +| Soft interrupt | Supported CLI/API control | `decodex lane interrupt --run-id ` and `POST /api/lane/interrupt` write a run-control request that the active app-server child delivers with `turn/interrupt` | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and records the protocol outcome when app-server returns one. | +| Hard interrupt fallback | Explicit fallback only | `decodex lane interrupt --run-id --force` and `POST /api/lane/interrupt` with `"force": true` classify process signaling as `hard_interrupt_fallback` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | | Steer active lane | Planned CLI/API control; bottom-layer method must stay broad | Decodex does not currently send `turn/steer` from its app-server client | Pass operator-supplied steer text through the CLI/API when available. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | | Retained resume/retry | Supported through runtime lifecycle | `decodex run `, retry scheduling, retained worktree recovery, and `thread/resume` for same-thread app-server continuation | Resume only when retained worktree, issue, branch, PR, and runtime evidence still prove the same lane. Treat ambiguous lineage as manual attention. | | Manual attention | Supported terminal control path | `decodex:needs-attention`, `issue_comment(kind = "manual_attention")`, and `issue_terminal_finalize(path = "manual_attention")` | Stop automation when policy requires a human decision. Explain the blocker through structured public fields and keep private evidence local. | @@ -73,11 +73,30 @@ Soft interrupt is the preferred active-turn stop path. A compliant soft interrup - leaves tracker state, retry policy, and retained-worktree classification to the Decodex runtime +The supported operator commands are: + +- `decodex lane inspect [--run-id ] [--json]` +- `decodex lane interrupt --run-id [--json] [--reason ]` +- `decodex lane interrupt --run-id --force [--json] [--reason ]` + +The local HTTP API mirrors those semantics: + +- `GET /api/lane/inspect?projectId=&issue=[&runId=]` +- `POST /api/lane/interrupt` with JSON fields `projectId`, `issue`, `runId`, + optional `reason`, and optional `force` + +When only one project is enabled on the local listener, the HTTP API can infer +`projectId`; otherwise the request must include it. CLI/API responses include the +lane identity, app-server thread and turn ids when known, process liveness summary, +soft-interrupt classification, hard-fallback classification when used, and next action. +They do not include private payload bodies. + Hard interrupt is a fallback, not the normal operator control. A hard interrupt may -signal the recorded child process only after Decodex proves the process identity still -matches the current run attempt. The runtime must preserve evidence, mark the attempt -with an interruption status, clear or retain ownership according to the runtime -contract, and avoid pretending the agent completed a terminal path. +signal the recorded child process only after explicit `--force` or `"force": true` +operator intent. The fallback emits `hard_interrupt_fallback`, preserves local +evidence, marks an active attempt as `interrupted` when a recorded child is signaled, +clears or retains ownership according to the runtime contract, and avoids pretending +the agent completed a terminal path. ## Steer @@ -150,11 +169,11 @@ Linear unless a schema-controlled public projection explicitly allows it. ## Implementation Status For This Rollout This document specifies capabilities that the CLI/API should expose first. Current code -already supports inspect, CLI project enable/disable, Linear scan requests, retained -resume/retry lifecycle paths, and manual-attention finalization. Current code does not -expose dashboard lane-mutation controls, does not yet implement Decodex CLI/API -controls that send `turn/interrupt` or `turn/steer`, and does not expose raw -`thread/inject_items` as an operator feature. +supports lane inspect, CLI project enable/disable, Linear scan requests, soft +interrupt, explicit hard-interrupt fallback, retained resume/retry lifecycle paths, and +manual-attention finalization. Current code does not expose dashboard lane-mutation +controls, does not yet implement Decodex CLI/API controls that send `turn/steer`, and +does not expose raw `thread/inject_items` as an operator feature. When implementation work adds the missing CLI/API controls, update this spec, [`app-server.md`](./app-server.md), the operator reference, and the Decodex plugin diff --git a/plugins/decodex/skills/automation/SKILL.md b/plugins/decodex/skills/automation/SKILL.md index 461daac..cd8b4fa 100644 --- a/plugins/decodex/skills/automation/SKILL.md +++ b/plugins/decodex/skills/automation/SKILL.md @@ -114,18 +114,21 @@ Read `docs/spec/lane-control.md` before using or explaining operator controls. Rules for agents: -- Inspect first with `decodex status`, `decodex status --json`, `decodex diagnose - --json`, `decodex evidence `, or the dashboard snapshot. Confirm project id, - issue id, branch, run id, attempt, thread/turn evidence, process liveness, tracker - state, and PR lineage before mutating anything. +- Inspect first with `decodex lane inspect `, `decodex status`, + `decodex status --json`, `decodex diagnose --json`, `decodex evidence `, or + the dashboard snapshot. Confirm project id, issue id, branch, run id, attempt, + thread/turn evidence, process liveness, tracker state, and PR lineage before mutating + anything. - Use project dispatch pause/resume only for future intake. `decodex project disable ` pauses new dispatch; `decodex project enable ` resumes it. Neither command kills active lanes. - Request Linear refresh with `POST /api/linear-scan` when a newly queued or relabeled issue should be observed before the next 5-minute poll. -- Prefer soft interrupt through the CLI/API `turn/interrupt` control when it exists and - the active turn can be targeted. Use hard process interruption only as a fallback when - soft interrupt is unavailable, timed out, or impossible. +- Prefer `decodex lane interrupt --run-id ` or + `POST /api/lane/interrupt` for soft `turn/interrupt` when the active turn can be + targeted. Use hard process interruption only with `--force` or `"force": true` and + only as `hard_interrupt_fallback` after soft interrupt is unavailable, timed out, or + impossible. - Use steer only through the CLI/API lane-control surface and only when the operator supplies the steer text. Bottom-layer steer support is broad; policy, audit, privacy, workflow, recovery, and skills provide the guardrails. diff --git a/plugins/decodex/skills/manual-cli/SKILL.md b/plugins/decodex/skills/manual-cli/SKILL.md index 20c1add..dbd4675 100644 --- a/plugins/decodex/skills/manual-cli/SKILL.md +++ b/plugins/decodex/skills/manual-cli/SKILL.md @@ -62,17 +62,20 @@ Before starting a live run, read the `automation` skill and the registered proje CLI/API lane controls: -- Inspect first with `decodex status`, `decodex status --json`, `decodex diagnose - --json`, or `decodex evidence `. +- Inspect first with `decodex lane inspect `, `decodex status`, + `decodex status --json`, `decodex diagnose --json`, or + `decodex evidence `. - Use `decodex project disable ` to pause future dispatch for a registered project, and `decodex project enable ` to resume it. - Use `POST /api/linear-scan` to request an intake/status refresh before the next scheduled Linear poll. - Use `decodex run ` only for deliberate one-issue automation or retained retry/resume when the lane remains eligible under the registered workflow. -- When future CLI/API controls expose soft interrupt or steer, prefer soft interrupt - over hard process interruption and use steer only with explicit operator-supplied - text. +- Use `decodex lane interrupt --run-id ` for soft active-turn + interruption. Add `--force` only when explicit operator intent allows hard + process-kill fallback after soft interrupt is unavailable or fails. +- Steer remains future CLI/API work; use it only through the lane-control surface once + implemented and only with explicit operator-supplied text. - Do not use active-lane UI controls, direct runtime DB edits, raw `thread/inject_items`, or tracker-state mutations as substitutes for the lane-control contract.