diff --git a/README.md b/README.md index 44023ead..7e992655 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,7 @@ cargo run -p decodex --bin decodex -- project list cargo run -p decodex --bin decodex -- status cargo run -p decodex --bin decodex -- diagnose --json cargo run -p decodex --bin decodex -- maintenance prune --dry-run +cargo run -p decodex --bin decodex -- lane steer --run-id --expected-turn-id --message cargo run -p decodex --bin decodex -- radar refresh-upstream-queue cargo run -p decodex --bin decodex -- radar refresh-release-delta cargo run -p decodex --bin decodex -- radar validate diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index 13945ead..fc28c7cc 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -31,7 +31,8 @@ use self::protocol::{ ProbeDynamicToolHandler, RunOutcome, RuntimeConfigSummary, SkillsListParams, SkillsListResponse, ThreadArchiveRequest, ThreadResumeRequest, ThreadSessionResponse, ThreadStartRequest, ThreadStatusChangedNotification, ToolRequestUserInputResponse, - TurnCompletedNotification, TurnError, TurnInterruptRequest, TurnStartRequest, UserInput, + TurnCompletedNotification, TurnError, TurnInterruptRequest, TurnStartRequest, TurnSteerRequest, + UserInput, }; use crate::{ agent::{ @@ -49,13 +50,17 @@ use crate::{ }, prelude::eyre, run_control::{ - self, LaneControlInterruptRequest, LaneControlInterruptResponse, PendingLaneControlRequest, + self, LaneControlInterruptRequest, LaneControlInterruptResponse, LaneControlSteerRequest, + LaneControlSteerResponse, LaneControlSteerResponseStatus, PendingLaneControlRequest, + PendingLaneControlSteerRequest, }, state::{ self, CodexAccountActivitySummary, CodexAccountMarker, EffectiveRuntimeMarker, - RUN_CONTROL_CHANNEL_DIR, RUN_CONTROL_CHANNEL_STATUS_COMPLETED, - RUN_CONTROL_CHANNEL_STATUS_FAILED, RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, - RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, RunControlChannel, StateStore, + RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, RUN_CONTROL_CHANNEL_DIR, + RUN_CONTROL_CHANNEL_STATUS_COMPLETED, RUN_CONTROL_CHANNEL_STATUS_FAILED, + RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, RUN_OPERATION_AGENT_RUN, + RUN_OPERATION_APP_SERVER_PREFLIGHT, RunControlActionOutcomeRequest, RunControlChannel, + StateStore, }, }; @@ -1366,6 +1371,9 @@ fn protocol_activity_detail(event_type: &str, payload_value: Option<&Value>) -> string_at_paths(value, &[&["params", "status", "type"], &["status", "type"]]) }); } + if event_type == "turn/steer" { + return protocol_steer_detail(payload_value); + } if event_type.starts_with("turn/") { return protocol_turn_status_from_value(event_type, payload_value) .or_else(|| Some(String::from("running"))); @@ -1432,6 +1440,18 @@ fn protocol_activity_detail(event_type: &str, payload_value: Option<&Value>) -> None } +fn protocol_steer_detail(payload_value: Option<&Value>) -> Option { + let value = payload_value?; + let outcome = + string_at_paths(value, &[&["outcome"]]).unwrap_or_else(|| String::from("unknown")); + let expected_turn_id = string_at_paths(value, &[&["expectedTurnId"], &["expected_turn_id"]]) + .unwrap_or_else(|| String::from("unknown")); + let response_turn_id = string_at_paths(value, &[&["responseTurnId"], &["response_turn_id"]]) + .unwrap_or_else(|| String::from("none")); + + Some(format!("{outcome}: expected={expected_turn_id}, response={response_turn_id}")) +} + fn warning_or_deprecation_detail(payload_value: Option<&Value>) -> Option { payload_value.and_then(|value| { string_at_paths( @@ -3169,13 +3189,19 @@ fn execute_turn_loop( flush_pending_messages(client, recorder, Some(thread_id))?; - let final_output = - wait_for_turn_completion(client, recorder, request, thread_id, &turn_id)?.final_output; + let outcome = wait_for_turn_completion(client, recorder, request, thread_id, &turn_id)?; + let final_turn_id = outcome.turn_id; + let final_output = outcome.final_output; if let Some(continuation_pending) = resolve_turn_completion(request, turn_count, &final_output)? { - return Ok(TurnLoopResult { turn_id, turn_count, final_output, continuation_pending }); + return Ok(TurnLoopResult { + turn_id: final_turn_id, + turn_count, + final_output, + continuation_pending, + }); } next_input = @@ -3346,6 +3372,18 @@ fn build_turn_start_request(thread_id: &str, user_input: &str) -> TurnStartReque } } +fn build_turn_steer_request( + thread_id: &str, + expected_turn_id: &str, + message: &str, +) -> TurnSteerRequest { + TurnSteerRequest { + thread_id: thread_id.to_owned(), + expected_turn_id: expected_turn_id.to_owned(), + input: vec![UserInput::Text { text: message.to_owned() }], + } +} + fn login_account_params(account: &CodexAccountLogin) -> LoginAccountParams { LoginAccountParams::ChatgptAuthTokens { access_token: account.access_token().to_owned(), @@ -3399,18 +3437,24 @@ fn wait_for_turn_completion( ) -> crate::prelude::Result { let control_enabled = request.activity_marker_path.is_some(); let mut last_activity_at = Instant::now(); + let mut target_turn_id = target_turn_id.to_owned(); let mut final_output = String::new(); let mut latest_turn_failure: Option = None; loop { - if control_enabled { - handle_pending_turn_control_requests( + if control_enabled + && let Some(response_turn_id) = handle_pending_turn_control_requests( client, recorder, request, target_thread_id, - target_turn_id, - )?; + &target_turn_id, + )? { + recorder.state_store.update_run_turn(recorder.run_id, &response_turn_id)?; + recorder.set_turn_id(&response_turn_id)?; + + target_turn_id = response_turn_id; + last_activity_at = Instant::now(); } let idle_timeout = protocol_activity_idle_timeout( @@ -3422,7 +3466,7 @@ fn wait_for_turn_completion( last_activity_at, idle_timeout, target_thread_id, - target_turn_id, + &target_turn_id, latest_turn_failure.as_ref(), control_enabled, )? @@ -3447,7 +3491,7 @@ fn wait_for_turn_completion( if let Some(outcome) = handle_turn_execution_notification( notification, target_thread_id, - target_turn_id, + &target_turn_id, &mut final_output, &mut latest_turn_failure, )? { @@ -3459,7 +3503,7 @@ fn wait_for_turn_completion( recorder, server_request, target_thread_id, - target_turn_id, + &target_turn_id, request.dynamic_tool_handler, request.codex_account_provider, )?, @@ -3467,7 +3511,7 @@ fn wait_for_turn_completion( JsonRpcMessage::Error(error) => { latest_turn_failure = Some(turn_failure_from_json_rpc_error_response( target_thread_id, - target_turn_id, + &target_turn_id, error, )); }, @@ -3504,12 +3548,20 @@ fn handle_turn_execution_notification( } }, "item/agentMessage/delta" => { + if !notification_targets_turn(notification, target_turn_id) { + return Ok(None); + } + let payload: AgentMessageDeltaNotification = serde_json::from_value(notification.params.clone())?; final_output.push_str(&payload.delta); }, "item/completed" => { + if !notification_targets_turn(notification, target_turn_id) { + return Ok(None); + } + let payload: ItemCompletedNotification = serde_json::from_value(notification.params.clone())?; @@ -3527,7 +3579,10 @@ fn handle_turn_execution_notification( return Ok(None); } if payload.turn.status == "completed" { - return Ok(Some(RunOutcome { final_output: mem::take(final_output) })); + return Ok(Some(RunOutcome { + final_output: mem::take(final_output), + turn_id: target_turn_id.to_owned(), + })); } if let Some(error) = payload.turn.error.as_ref() { @@ -3554,6 +3609,10 @@ fn handle_turn_execution_notification( Ok(None) } +fn notification_targets_turn(notification: &JsonRpcNotification, target_turn_id: &str) -> bool { + turn_id_from_value(¬ification.params).is_none_or(|turn_id| turn_id == target_turn_id) +} + fn next_turn_wire_message( client: &mut AppServerClient, last_activity_at: Instant, @@ -3587,9 +3646,9 @@ fn handle_pending_turn_control_requests( request: &AppServerRunRequest<'_>, target_thread_id: &str, target_turn_id: &str, -) -> crate::prelude::Result<()> { +) -> crate::prelude::Result> { let Some(worktree_path) = request.activity_marker_path.as_deref() else { - return Ok(()); + return Ok(None); }; for pending in run_control::pending_interrupt_requests(worktree_path, &request.run_id)? { @@ -3603,8 +3662,21 @@ fn handle_pending_turn_control_requests( target_turn_id, )?; } + for pending in run_control::pending_steer_requests(worktree_path, &request.run_id)? { + if let Some(response_turn_id) = handle_pending_turn_steer_request( + client, + recorder, + request, + worktree_path, + pending, + target_thread_id, + target_turn_id, + )? { + return Ok(Some(response_turn_id)); + } + } - Ok(()) + Ok(None) } fn handle_pending_turn_interrupt_request( @@ -3677,6 +3749,84 @@ fn handle_pending_turn_interrupt_request( Ok(()) } +fn handle_pending_turn_steer_request( + client: &mut AppServerClient, + recorder: &mut RunRecorder<'_>, + run_request: &AppServerRunRequest<'_>, + worktree_path: &Path, + pending: PendingLaneControlSteerRequest, + target_thread_id: &str, + target_turn_id: &str, +) -> crate::prelude::Result> { + record_lane_steer_request(recorder, &pending.request)?; + + if let Some((error_class, message)) = lane_steer_request_rejection( + run_request, + &pending.request, + target_thread_id, + target_turn_id, + ) { + let response = LaneControlSteerResponse::rejected( + &pending.request, + target_turn_id, + error_class, + message, + ); + + record_lane_steer_response(recorder, &response, Some(pending.request.audit_record_id))?; + + run_control::write_steer_response(worktree_path, &response)?; + run_control::remove_steer_request(&pending.path)?; + + return Ok(None); + } + + let result = client.steer_turn_with_handler( + build_turn_steer_request( + &pending.request.thread_id, + &pending.request.expected_turn_id, + &pending.request.message, + ), + |connection, wire_message, server_request| { + handle_server_request_while_waiting( + connection, + recorder, + wire_message, + server_request, + RequestDispatchContext::new( + RequestWaitPhase::TurnExecution, + run_request.dynamic_tool_handler, + run_request.codex_account_provider, + Some(target_thread_id), + None, + ), + ) + }, + ); + let response = match result { + Ok(value) => + LaneControlSteerResponse::delivered(&pending.request, target_turn_id, &value.turn_id), + Err(error) => { + let error_class = steer_error_class(&error); + + LaneControlSteerResponse::failed( + &pending.request, + target_turn_id, + error_class, + format!("turn/steer failed with {error_class}."), + ) + }, + }; + let response_turn_id = response.response_turn_id.clone(); + + record_lane_steer_response(recorder, &response, Some(pending.request.audit_record_id))?; + + run_control::write_steer_response(worktree_path, &response)?; + run_control::remove_steer_request(&pending.path)?; + + Ok(response_turn_id) +} + fn record_lane_interrupt_request( recorder: &mut RunRecorder<'_>, request: &LaneControlInterruptRequest, @@ -3740,6 +3890,111 @@ fn record_lane_interrupt_response( Ok(()) } +fn record_lane_steer_request( + recorder: &mut RunRecorder<'_>, + request: &LaneControlSteerRequest, +) -> crate::prelude::Result<()> { + recorder.record( + "lane_control/steer/request", + &serde_json::json!({ + "requestId": request.request_id, + "auditRecordId": request.audit_record_id, + "projectId": request.project_id, + "issueId": request.issue_id, + "runId": request.run_id, + "attemptNumber": request.attempt_number, + "threadId": request.thread_id, + "expectedTurnId": request.expected_turn_id, + "source": request.source, + "messageByteCount": request.message_byte_count, + "messageLineCount": request.message_line_count, + }) + .to_string(), + ) +} + +fn record_lane_steer_response( + recorder: &mut RunRecorder<'_>, + response: &LaneControlSteerResponse, + parent_record_id: Option, +) -> crate::prelude::Result<()> { + let outcome = match &response.status { + LaneControlSteerResponseStatus::Delivered => RUN_CONTROL_ACTION_COMPLETED, + LaneControlSteerResponseStatus::Failed | LaneControlSteerResponseStatus::Rejected => + RUN_CONTROL_ACTION_FAILED, + }; + let metadata = serde_json::json!({ + "requestId": response.request_id, + "outcome": outcome, + "reason": response.classification, + "failureClass": response.error_class, + "expectedTurnId": response.expected_turn_id, + "currentTurnId": response.current_turn_id, + "responseTurnId": response.response_turn_id, + }); + + recorder.record("turn/steer", &metadata.to_string())?; + recorder.record( + "lane_control/steer/response", + &serde_json::json!({ + "requestId": response.request_id, + "projectId": response.project_id, + "issueId": response.issue_id, + "runId": response.run_id, + "attemptNumber": response.attempt_number, + "threadId": response.thread_id, + "expectedTurnId": response.expected_turn_id, + "currentTurnId": response.current_turn_id, + "responseTurnId": response.response_turn_id, + "status": &response.status, + "classification": response.classification, + "method": response.method, + "errorClass": response.error_class, + }) + .to_string(), + )?; + recorder.state_store.record_run_control_action_delivery_outcome( + RunControlActionOutcomeRequest { + project_id: &response.project_id, + issue_id: &response.issue_id, + run_id: &response.run_id, + attempt_number: response.attempt_number, + thread_id: Some(&response.thread_id), + turn_id: Some(&response.expected_turn_id), + current_thread_id: Some(&response.thread_id), + current_turn_id: response.current_turn_id.as_deref(), + source: "app_server_child", + action: "steer", + outcome, + reason: &response.classification, + parent_record_id, + timeout_ms: None, + metadata: Some(&metadata), + channel: None, + }, + )?; + recorder.state_store.append_private_execution_event( + &response.project_id, + &response.issue_id, + &response.run_id, + response.attempt_number, + "lane_control/steer", + serde_json::json!({ + "requestId": response.request_id, + "status": &response.status, + "classification": response.classification, + "method": response.method, + "errorClass": response.error_class, + "expectedTurnId": response.expected_turn_id, + "currentTurnId": response.current_turn_id, + "responseTurnId": response.response_turn_id, + "message": response.message, + }), + )?; + + Ok(()) +} + fn lane_interrupt_request_rejection( run_request: &AppServerRunRequest<'_>, request: &LaneControlInterruptRequest, @@ -3804,6 +4059,70 @@ fn lane_interrupt_request_rejection( None } +fn lane_steer_request_rejection( + run_request: &AppServerRunRequest<'_>, + request: &LaneControlSteerRequest, + target_thread_id: &str, + target_turn_id: &str, +) -> Option<(&'static str, String)> { + if request.project_id != run_request.project_id { + return Some(( + "project_mismatch", + format!( + "Control request targeted project `{}`, but this run belongs to `{}`.", + request.project_id, run_request.project_id + ), + )); + } + if request.issue_id != run_request.issue_id { + return Some(( + "issue_mismatch", + format!( + "Control request targeted issue `{}`, but this run belongs to `{}`.", + request.issue_id, run_request.issue_id + ), + )); + } + if request.run_id != run_request.run_id { + return Some(( + "run_mismatch", + format!( + "Control request targeted run `{}`, but this run is `{}`.", + request.run_id, run_request.run_id + ), + )); + } + if request.attempt_number != run_request.attempt_number { + return Some(( + "attempt_mismatch", + format!( + "Control request targeted attempt `{}`, but this run is attempt `{}`.", + request.attempt_number, run_request.attempt_number + ), + )); + } + if request.thread_id != target_thread_id { + return Some(( + "thread_mismatch", + format!( + "Control request targeted thread `{}`, but the active thread is `{target_thread_id}`.", + request.thread_id + ), + )); + } + if request.expected_turn_id != target_turn_id { + return Some(( + "stale_expected_turn_id", + format!( + "Control request expected turn `{}`, but the active turn is `{target_turn_id}`.", + request.expected_turn_id + ), + )); + } + + None +} + fn soft_interrupt_error_class(error: &Report) -> &'static str { if is_app_server_output_timeout(error) { return "soft_interrupt_timed_out"; @@ -3818,6 +4137,25 @@ fn soft_interrupt_error_class(error: &Report) -> &'static str { } } +fn steer_error_class(error: &Report) -> &'static str { + if is_app_server_output_timeout(error) { + return "app_server_turn_steer_timed_out"; + } + + let error_text = error.to_string().to_ascii_lowercase(); + + if error_text.contains("activeturnnotsteerable") + || error_text.contains("active turn not steerable") + { + return "active_turn_not_steerable"; + } + if error_text.contains("-32601") || error_text.contains("method not found") { + return "app_server_turn_steer_unsupported"; + } + + "app_server_turn_steer_failed" +} + fn is_app_server_output_timeout(error: &Report) -> bool { error.downcast_ref::().is_some() } @@ -4158,9 +4496,6 @@ fn dispatch_dynamic_tool_call( request: &JsonRpcRequest, context: RequestDispatchContext<'_>, ) -> crate::prelude::Result<()> { - let target_turn_id = context.target_turn_id.ok_or_else(|| { - eyre::eyre!("app_server_protocol_failure: turn execution request missing turn context") - })?; let target_thread_id = context.target_thread_id.ok_or_else(|| { eyre::eyre!("app_server_protocol_failure: turn execution request missing thread context") })?; @@ -4168,7 +4503,7 @@ fn dispatch_dynamic_tool_call( context.dynamic_tool_handler, request, target_thread_id, - target_turn_id, + context.target_turn_id, ); respond_to_dynamic_tool_call_dispatch(connection, recorder, request, dispatch) @@ -4391,7 +4726,7 @@ fn handle_dynamic_tool_call( dynamic_tool_handler: Option<&dyn DynamicToolHandler>, request: &JsonRpcRequest, target_thread_id: &str, - target_turn_id: &str, + target_turn_id: Option<&str>, ) -> DynamicToolCallDispatch { let payload = match validated_dynamic_tool_call_payload(request, target_thread_id, target_turn_id) { @@ -4456,7 +4791,7 @@ fn handle_dynamic_tool_call( fn validated_dynamic_tool_call_payload( request: &JsonRpcRequest, target_thread_id: &str, - target_turn_id: &str, + target_turn_id: Option<&str>, ) -> std::result::Result> { let payload = serde_json::from_value::(request.params.clone()).map_err( |error| { @@ -4507,7 +4842,10 @@ fn validated_dynamic_tool_call_payload( ), ))); } - if payload.turn_id != target_turn_id { + + if let Some(target_turn_id) = target_turn_id + && payload.turn_id != target_turn_id + { return Err(Box::new(DynamicToolCallDispatch::protocol_failure( Some(payload.tool), payload.namespace, diff --git a/apps/decodex/src/agent/app_server/protocol.rs b/apps/decodex/src/agent/app_server/protocol.rs index 3ce2bb78..fbc38133 100644 --- a/apps/decodex/src/agent/app_server/protocol.rs +++ b/apps/decodex/src/agent/app_server/protocol.rs @@ -196,6 +196,21 @@ impl AppServerClient { self.connection.request_with_handler("turn/interrupt", ¶ms, REQUEST_TIMEOUT, handler) } + pub(super) fn steer_turn_with_handler( + &mut self, + params: TurnSteerRequest, + handler: H, + ) -> crate::prelude::Result + where + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, + { + self.connection.request_with_handler("turn/steer", ¶ms, REQUEST_TIMEOUT, handler) + } + pub(super) fn command_exec( &mut self, params: &CommandExecParams, @@ -282,6 +297,7 @@ impl AppServerClient { #[derive(Default)] pub(super) struct RunOutcome { pub(super) final_output: String, + pub(super) turn_id: String, } #[derive(Debug, Serialize)] @@ -448,6 +464,20 @@ pub(super) struct TurnInterruptRequest { pub(super) turn_id: String, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(super) struct TurnSteerRequest { + pub(super) thread_id: String, + pub(super) expected_turn_id: String, + pub(super) input: Vec, +} + +#[derive(Debug, Deserialize)] +pub(super) struct TurnSteerResponse { + #[serde(rename = "turnId")] + pub(super) turn_id: String, +} + #[derive(Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub(super) struct CommandExecParams { diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index 75a63996..054ecad9 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -33,6 +33,9 @@ use crate::{ }, }, prelude::{Result, eyre}, + run_control::{ + self, LaneControlSteerRequest, LaneControlSteerRequestInput, LaneControlSteerResponse, + }, state::{self, ProtocolActivitySummary, StateStore}, test_support::TestEnvVarGuard, }; @@ -365,6 +368,18 @@ fn turn_start_request_uses_default_runtime_settings() { )); } +#[test] +fn turn_steer_request_uses_expected_turn_precondition_and_text() { + let request = super::build_turn_steer_request("thread-1", "turn-1", "change direction"); + + assert_eq!(request.thread_id, "thread-1"); + assert_eq!(request.expected_turn_id, "turn-1"); + assert!(matches!( + request.input.as_slice(), + [UserInput::Text{ text }] if text == "change direction" + )); +} + #[test] fn thread_start_and_resume_requests_inherit_runtime_config() { fn assert_runtime_config(value: &Value) { @@ -1328,7 +1343,11 @@ fn structured_error_notification_becomes_turn_failure() { fn json_rpc_error_response_becomes_recoverable_turn_failure() { let error = JsonRpcError { id: serde_json::json!(7), - error: JsonRpcErrorPayload { code: -32_000, message: String::from("late response") }, + error: JsonRpcErrorPayload { + code: -32_000, + message: String::from("late response"), + data: None, + }, }; let failure = super::turn_failure_from_json_rpc_error_response("thread-1", "turn-1", &error); let failure_message = failure.to_string(); @@ -1339,6 +1358,75 @@ fn json_rpc_error_response_becomes_recoverable_turn_failure() { assert!(failure_message.contains("late response")); } +#[test] +fn steer_delivery_error_classifies_active_turn_not_steerable_distinctly() { + let error = eyre::eyre!( + "`turn/steer` failed with -32000: turn is not steerable data: {{\"type\":\"activeTurnNotSteerable\"}}" + ); + let failure_class = super::steer_error_class(&error); + + assert_eq!(failure_class, "active_turn_not_steerable"); +} + +#[test] +fn steer_delivery_error_classifies_missing_method_as_unsupported() { + for message in [ + "`turn/steer` failed with -32601: method not found", + "`turn/steer` failed with -32601: Method not found", + ] { + let error = eyre::eyre!("{message}"); + let failure_class = super::steer_error_class(&error); + + assert_eq!(failure_class, "app_server_turn_steer_unsupported"); + } +} + +#[test] +fn steer_response_wait_ignores_temp_file_until_atomic_response_exists() -> Result<()> { + let temp_dir = TempDir::new()?; + let request = LaneControlSteerRequest::new(LaneControlSteerRequestInput { + audit_record_id: 7, + project_id: "decodex", + issue_id: "XY-704", + run_id: "run-1", + attempt_number: 1, + thread_id: "thread-1", + expected_turn_id: "turn-1", + source: "test", + message: "change direction", + }); + let run_dir = temp_dir.path().join(".decodex-run-control").join("run-1"); + + fs::create_dir_all(&run_dir)?; + fs::write(run_dir.join(format!("{}.steer-response.json.tmp", request.request_id)), b"{")?; + + assert!( + run_control::wait_for_steer_response( + temp_dir.path(), + "run-1", + &request.request_id, + Duration::from_millis(1), + )? + .is_none() + ); + + let response = LaneControlSteerResponse::delivered(&request, "turn-1", "turn-2"); + + run_control::write_steer_response(temp_dir.path(), &response)?; + + assert_eq!( + run_control::wait_for_steer_response( + temp_dir.path(), + "run-1", + &request.request_id, + Duration::from_millis(100), + )?, + Some(response) + ); + + Ok(()) +} + #[test] fn thread_resume_fallback_only_allows_missing_thread_errors() { assert!(super::thread_resume_error_allows_fallback(&eyre::eyre!("thread not found"))); @@ -1393,7 +1481,7 @@ fn dynamic_tool_call_enforces_declared_namespace() { params, }; let dispatch = - super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", "turn-1"); + super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", Some("turn-1")); assert_eq!(dispatch.response.success, expected_success, "{case_name}"); assert_eq!( @@ -1436,7 +1524,8 @@ fn dynamic_tool_call_rejects_invalid_response_shape() { "turnId": "turn-1" }), }; - let dispatch = super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", "turn-1"); + let dispatch = + super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", Some("turn-1")); assert!(!dispatch.response.success); assert!(matches!( @@ -1464,7 +1553,8 @@ fn dynamic_tool_call_records_tool_failures_without_terminal_protocol_failure() { "turnId": "turn-1" }), }; - let dispatch = super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", "turn-1"); + let dispatch = + super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", Some("turn-1")); assert!(!dispatch.response.success); assert!(dispatch.terminal_failure.is_none()); @@ -1476,6 +1566,93 @@ fn dynamic_tool_call_records_tool_failures_without_terminal_protocol_failure() { assert_eq!(diagnostic.message, "tool rejected the request"); } +#[test] +fn dynamic_tool_call_can_validate_thread_without_fixed_turn_during_steer_rpc() { + let handler = NamespacedDynamicToolHandler { seen_namespace: RefCell::new(None) }; + let request = JsonRpcRequest { + id: serde_json::json!(1), + method: String::from("item/tool/call"), + params: serde_json::json!({ + "arguments": {}, + "callId": "call-1", + "namespace": "tracker", + "threadId": "thread-1", + "tool": "tracker_tool", + "turnId": "turn-after-steer" + }), + }; + let dispatch = super::handle_dynamic_tool_call(Some(&handler), &request, "thread-1", None); + + assert!(dispatch.response.success); + assert!(dispatch.terminal_failure.is_none()); + assert_eq!(*handler.seen_namespace.borrow(), Some(String::from("tracker"))); +} + +#[test] +fn turn_notification_ignores_agent_output_for_non_target_turn() { + let old_completed = JsonRpcNotification { + method: String::from("item/completed"), + params: serde_json::json!({ + "threadId": "thread-1", + "turnId": "turn-old", + "item": {"type": "agentMessage", "text": "OLD"} + }), + }; + let old_delta = JsonRpcNotification { + method: String::from("item/agentMessage/delta"), + params: serde_json::json!({ + "threadId": "thread-1", + "turnId": "turn-old", + "delta": " OLD_DELTA" + }), + }; + let target_completed = JsonRpcNotification { + method: String::from("item/completed"), + params: serde_json::json!({ + "threadId": "thread-1", + "turnId": "turn-new", + "item": {"type": "agentMessage", "text": "NEW"} + }), + }; + let mut final_output = String::from("CURRENT"); + let mut latest_turn_failure: Option = None; + + assert!( + super::handle_turn_execution_notification( + &old_completed, + "thread-1", + "turn-new", + &mut final_output, + &mut latest_turn_failure + ) + .expect("old completed notification should parse") + .is_none() + ); + assert!( + super::handle_turn_execution_notification( + &old_delta, + "thread-1", + "turn-new", + &mut final_output, + &mut latest_turn_failure + ) + .expect("old delta notification should parse") + .is_none() + ); + assert_eq!(final_output, "CURRENT"); + + super::handle_turn_execution_notification( + &target_completed, + "thread-1", + "turn-new", + &mut final_output, + &mut latest_turn_failure, + ) + .expect("target completed notification should parse"); + + assert_eq!(final_output, "NEW"); +} + #[test] fn dynamic_tool_call_unavailable_outside_turn_execution_is_protocol_diagnostic() { let dispatch = super::dynamic_tool_call_unavailable_for_phase(RequestWaitPhase::TurnStart); diff --git a/apps/decodex/src/agent/json_rpc.rs b/apps/decodex/src/agent/json_rpc.rs index 5a869b96..a654d64d 100644 --- a/apps/decodex/src/agent/json_rpc.rs +++ b/apps/decodex/src/agent/json_rpc.rs @@ -372,10 +372,17 @@ impl JsonRpcConnection { return Ok(serde_json::from_value(response.result.clone())?); }, JsonRpcMessage::Error(error) if error.id == expected_id => { + let data = error + .error + .data + .as_ref() + .map_or_else(String::new, |data| format!(" data: {data}")); + return Err(eyre::eyre!( - "`{method}` failed with {}: {}", + "`{method}` failed with {}: {}{}", error.error.code, - error.error.message + error.error.message, + data )); }, JsonRpcMessage::Request(request) => handle_request(self, &wire_message, request)?, @@ -599,6 +606,8 @@ pub(crate) struct JsonRpcError { pub(crate) struct JsonRpcErrorPayload { pub(crate) code: i64, pub(crate) message: String, + + pub(crate) data: Option, } #[derive(Clone, Debug)] diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index b9a416a3..8bf66b0f 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -2,6 +2,7 @@ use std::{ fs, io::{self, Read as _}, path::{Path, PathBuf}, + time::Duration, }; use clap::{ @@ -20,8 +21,9 @@ use crate::{ maintenance::{self, MaintenanceMode, MaintenancePruneRequest, MaintenanceScope}, manual::{self, ManualCommitRequest, ManualLandRequest}, orchestrator::{ - self, DiagnoseRequest, EvidenceRequest, IssueDispatchMode, LaneInspectRequest, - LaneInterruptRequest, RunOnceRequest, ServeRequest, + self, DEFAULT_STEER_RESULT_WAIT_TIMEOUT, DiagnoseRequest, EvidenceRequest, + IssueDispatchMode, LaneInspectRequest, LaneInterruptRequest, LaneSteerReport, + LaneSteerRequest, RunOnceRequest, ServeRequest, }, prelude::{Result, eyre}, radar::{ @@ -447,6 +449,7 @@ impl LaneCommand { source: "cli", }) .map(|_report| ()), + LaneSubcommand::Steer(args) => args.run(self.project_config.as_path()), } } } @@ -483,6 +486,57 @@ struct LaneInterruptCommand { json: bool, } +#[derive(Debug, Args)] +struct LaneSteerCommand { + /// Issue identifier or local issue id for the active lane. + #[arg(value_name = "ISSUE")] + issue: String, + /// Run id that must own the active turn. + #[arg(long, value_name = "RUN_ID")] + run_id: String, + /// Current active app-server turn id precondition. + #[arg(long, value_name = "TURN_ID")] + expected_turn_id: String, + /// Operator-supplied steer text to send to the active turn. + #[arg(long, value_name = "TEXT")] + message: String, + /// Emit structured JSON instead of human-readable text. + #[arg(long)] + json: bool, + /// How long to wait for the active attempt to report delivery. + #[arg(long, value_name = "MILLISECONDS", default_value_t = default_lane_steer_wait_timeout_ms())] + wait_timeout_ms: u64, +} +impl LaneSteerCommand { + fn run(&self, config_path: Option<&Path>) -> Result<()> { + let report = orchestrator::steer_lane(LaneSteerRequest { + config_path, + project_id: None, + issue: &self.issue, + run_id: &self.run_id, + expected_turn_id: &self.expected_turn_id, + message: &self.message, + source: "cli", + wait_timeout: Duration::from_millis(self.wait_timeout_ms), + })?; + + if self.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print!("{}", render_lane_steer_report(&report)); + } + if lane_steer_report_is_failure(&report) { + eyre::bail!( + "lane steer {}: {}", + report.outcome, + report.failure_class.as_deref().unwrap_or(&report.reason) + ); + } + + Ok(()) + } +} + #[derive(Debug, Args)] struct StatusCommand { #[command(flatten)] @@ -1256,7 +1310,7 @@ enum Command { Serve(ServeCommand), /// Manage the local Decodex project registry. Project(ProjectCommand), - /// Inspect or interrupt a local lane. + /// Inspect or influence a local lane. Lane(LaneCommand), /// Inspect the current local runtime state for one configured project. Status(StatusCommand), @@ -1319,6 +1373,8 @@ enum LaneSubcommand { Inspect(LaneInspectCommand), /// Soft-interrupt an active app-server turn, with optional hard fallback. Interrupt(LaneInterruptCommand), + /// Send operator-supplied text to an active steerable turn. + Steer(LaneSteerCommand), } #[derive(Debug, Subcommand)] @@ -1383,6 +1439,30 @@ enum RadarBundleSubcommand { Validate(RadarBundleValidateCommand), } +fn default_lane_steer_wait_timeout_ms() -> u64 { + u64::try_from(DEFAULT_STEER_RESULT_WAIT_TIMEOUT.as_millis()).unwrap_or(10_000) +} + +fn lane_steer_report_is_failure(report: &LaneSteerReport) -> bool { + matches!(report.outcome.as_str(), "rejected" | "failed" | "timed_out" | "fallback") +} + +fn render_lane_steer_report(report: &LaneSteerReport) -> String { + format!( + "lane steer {}: issue={} run_id={} attempt={} expected_turn_id={} current_turn_id={} response_turn_id={} failure_class={} audit_record_id={} delivery_status={}\n", + report.outcome, + report.issue_identifier.as_deref().unwrap_or(&report.issue_id), + report.run_id, + report.attempt_number, + report.expected_turn_id, + report.current_turn_id.as_deref().unwrap_or("none"), + report.response_turn_id.as_deref().unwrap_or("none"), + report.failure_class.as_deref().unwrap_or("none"), + report.audit_record_id, + report.delivery_status + ) +} + fn read_attempt_request(request: &str) -> Result { let raw = if request == "-" { let mut raw = String::new(); @@ -1416,8 +1496,8 @@ mod tests { use crate::cli::{ AccountCommand, AccountSubcommand, AccountUseCommand, AttemptCommand, Cli, Command, CommitCommand, DiagnoseCommand, EvidenceCommand, LandCommand, LaneCommand, - LaneInspectCommand, LaneInterruptCommand, LaneSubcommand, ProbeCommand, ProjectCommand, - ProjectConfigArgs, ProjectSubcommand, RadarBackfillReleaseRangeCommand, + LaneInspectCommand, LaneInterruptCommand, LaneSteerCommand, LaneSubcommand, ProbeCommand, + ProjectCommand, ProjectConfigArgs, ProjectSubcommand, RadarBackfillReleaseRangeCommand, RadarBundleBuildCommand, RadarBundleCommand, RadarBundleSubcommand, RadarBundleValidateCommand, RadarCommand, RadarLedgerCommand, RadarLedgerIngestExistingCommand, RadarLedgerSubcommand, RadarLedgerSummaryCommand, @@ -2041,6 +2121,44 @@ mod tests { )); } + #[test] + fn parses_lane_steer_with_expected_turn_precondition() { + let cli = Cli::parse_from([ + "decodex", + "lane", + "--config", + "./project.toml", + "steer", + "XY-704", + "--run-id", + "run-1", + "--expected-turn-id", + "turn-1", + "--message", + "adjust the current implementation", + "--json", + ]); + + assert!(matches!( + cli.command, + Command::Lane(LaneCommand { + project_config: ProjectConfigArgs { config: Some(config) }, + command: LaneSubcommand::Steer(LaneSteerCommand { + issue, + run_id, + expected_turn_id, + message, + json: true, + .. + }) + }) if config == Path::new("./project.toml") + && issue == "XY-704" + && run_id == "run-1" + && expected_turn_id == "turn-1" + && message == "adjust the current implementation" + )); + } + #[test] fn parses_diagnose_with_json_limit_and_project_config() { let cli = Cli::parse_from([ diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index c11f8cd7..dd20a6e4 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -1,7 +1,8 @@ mod lane_control; pub(crate) use lane_control::{ - LaneInspectRequest, LaneInterruptRequest, interrupt_lane, print_lane_inspect, + DEFAULT_STEER_RESULT_WAIT_TIMEOUT, LaneInspectRequest, LaneInterruptRequest, interrupt_lane, + print_lane_inspect, steer_lane, }; #[cfg(unix)] use std::os::fd::AsRawFd; @@ -91,7 +92,9 @@ const OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH: &str = "/api/operator-snapshot"; const OPERATOR_LINEAR_SCAN_ENDPOINT_PATH: &str = "/api/linear-scan"; const OPERATOR_LANE_INSPECT_ENDPOINT_PATH: &str = "/api/lane/inspect"; const OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH: &str = "/api/lane/interrupt"; -const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 8_192; +const OPERATOR_LANE_STEER_ENDPOINT_PATH: &str = "/api/lane/steer"; +const OPERATOR_LANE_STEER_ALIAS_ENDPOINT_PATH: &str = "/api/lane-steer"; +const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 256 * 1_024; const OPERATOR_DASHBOARD_WS_CLIENT_MESSAGE_MAX_BYTES: usize = 64 * 1_024; const OPERATOR_STATE_HEADER_TERMINATOR: &[u8] = b"\r\n\r\n"; const OPERATOR_DASHBOARD_WS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); diff --git a/apps/decodex/src/orchestrator/lane_control.rs b/apps/decodex/src/orchestrator/lane_control.rs index 6e23a611..386117a5 100644 --- a/apps/decodex/src/orchestrator/lane_control.rs +++ b/apps/decodex/src/orchestrator/lane_control.rs @@ -12,20 +12,25 @@ use serde::Serialize; use crate::{ config::ServiceConfig, orchestrator::{ - self, ChildRunRef, DEFAULT_STATUS_RUN_LIMIT, OperatorRunStatus, OperatorStatusSnapshot, + self, ChildRunRef, DEFAULT_STATUS_RUN_LIMIT, LaneSteerReport, LaneSteerRequest, + OperatorRunStatus, OperatorStatusSnapshot, }, prelude::{Result, eyre}, run_control::{ self, LaneControlInterruptRequest, LaneControlInterruptRequestInput, - LaneControlInterruptResponse, LaneControlResponseStatus, + LaneControlInterruptResponse, LaneControlResponseStatus, LaneControlSteerRequest, + LaneControlSteerRequestInput, LaneControlSteerResponse, LaneControlSteerResponseStatus, }, runtime, state::{ - RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, RUN_CONTROL_ACTION_FALLBACK, - RUN_CONTROL_ACTION_TIMED_OUT, RunControlActionReceipt, RunControlActionRequest, StateStore, + RUN_CONTROL_ACTION_ACCEPTED, RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, + RUN_CONTROL_ACTION_FALLBACK, RUN_CONTROL_ACTION_TIMED_OUT, RunControlActionReceipt, + RunControlActionRequest, StateStore, }, }; +pub(crate) const DEFAULT_STEER_RESULT_WAIT_TIMEOUT: Duration = Duration::from_secs(10); + #[cfg(not(test))] const LANE_INTERRUPT_RESPONSE_WAIT: Duration = Duration::from_secs(3); #[cfg(test)] @@ -280,6 +285,20 @@ pub(crate) fn interrupt_lane(request: LaneInterruptRequest<'_>) -> Result) -> Result { + validate_lane_steer_request(&request)?; + + let state_store = runtime::open_runtime_store()?; + let config = load_lane_control_project_for_optional_id( + request.config_path, + request.project_id, + &state_store, + )?; + let report = steer_lane_with_state(&state_store, &config, &request)?; + + Ok(report) +} + pub(super) fn build_lane_inspect_report( state_store: &StateStore, project: &ServiceConfig, @@ -356,6 +375,23 @@ pub(super) fn interrupt_lane_with_state( }) } +pub(super) fn steer_lane_with_state( + state_store: &StateStore, + project: &ServiceConfig, + request: &LaneSteerRequest<'_>, +) -> Result { + validate_lane_steer_request(request)?; + + let snapshot = orchestrator::build_operator_status_snapshot( + project, + state_store, + DEFAULT_STATUS_RUN_LIMIT, + )?; + let run = select_interrupt_lane_run(&snapshot, request.issue, request.run_id)?; + + attempt_lane_steer(state_store, project, &run, request) +} + fn soft_interrupt_allows_hard_fallback(soft: &LaneSoftInterruptReport) -> bool { matches!(soft.status.as_str(), "pending" | "failed" | "unavailable") && soft.error_class.as_deref() != Some("lane_not_active") @@ -376,6 +412,43 @@ fn load_lane_control_project( ServiceConfig::from_path(&config_path) } +fn load_lane_control_project_for_optional_id( + config_path: Option<&Path>, + project_id: Option<&str>, + state_store: &StateStore, +) -> Result { + let Some(project_id) = project_id.map(str::trim).filter(|id| !id.is_empty()) else { + return load_lane_control_project(config_path, state_store); + }; + let config_path = if let Some(config_path) = config_path { + ServiceConfig::resolve_project_config_path(config_path)? + } else { + state_store + .list_projects()? + .into_iter() + .find(|registration| registration.service_id() == project_id) + .map(|registration| registration.config_path().to_path_buf()) + .ok_or_else(|| { + eyre::eyre!( + "Decodex project `{project_id}` is not registered. Pass --config or run `decodex project add`." + ) + })? + }; + + runtime::register_project_config(state_store, &config_path, true)?; + + let project = ServiceConfig::from_path(&config_path)?; + + if project.service_id() != project_id { + eyre::bail!( + "Lane steer project `{project_id}` did not match config service id `{}`.", + project.service_id() + ); + } + + Ok(project) +} + fn select_interrupt_lane_run( snapshot: &OperatorStatusSnapshot, issue: &str, @@ -437,6 +510,255 @@ fn soft_interrupt_available_for_run(run: &OperatorRunStatus) -> bool { && run.control_capability.as_ref().is_some_and(|capability| capability.status == "active") } +fn attempt_lane_steer( + state_store: &StateStore, + project: &ServiceConfig, + run: &OperatorRunStatus, + request: &LaneSteerRequest<'_>, +) -> Result { + let message_byte_count = request.message.len(); + let message_line_count = lane_steer_message_line_count(request.message); + let metadata = serde_json::json!({ + "expectedTurnId": request.expected_turn_id, + "messageByteCount": message_byte_count, + "messageLineCount": message_line_count, + }); + let receipt = state_store.resolve_run_control_action(RunControlActionRequest { + project_id: project.service_id(), + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + thread_id: run.thread_id.as_deref(), + turn_id: Some(request.expected_turn_id), + source: request.source, + action: "steer", + timeout_ms: Some(i64::try_from(request.wait_timeout.as_millis()).unwrap_or(i64::MAX)), + metadata: Some(&metadata), + })?; + + if receipt.outcome() != RUN_CONTROL_ACTION_ACCEPTED { + return Ok(lane_steer_report_from_rejected_receipt( + request.issue, + run, + &receipt, + request.expected_turn_id, + message_byte_count, + message_line_count, + )); + } + + let Some(worktree_path) = absolute_lane_worktree_path(project, state_store, run)? else { + eyre::bail!("Lane steer was accepted without an active lane worktree."); + }; + let Some(thread_id) = run.thread_id.as_deref() else { + eyre::bail!("Lane steer was accepted before the active app-server thread id was known."); + }; + let control_request = LaneControlSteerRequest::new(LaneControlSteerRequestInput { + audit_record_id: receipt.audit_record_id(), + project_id: project.service_id(), + issue_id: &run.issue_id, + run_id: &run.run_id, + attempt_number: run.attempt_number, + thread_id, + expected_turn_id: request.expected_turn_id, + source: request.source, + message: request.message, + }); + let request_path = run_control::write_steer_request(&worktree_path, &control_request)?; + + state_store.append_private_execution_event( + project.service_id(), + &run.issue_id, + &run.run_id, + run.attempt_number, + "lane_control/steer/requested", + serde_json::json!({ + "requestId": control_request.request_id, + "source": request.source, + "method": "turn/steer", + "expectedTurnId": request.expected_turn_id, + "messageByteCount": control_request.message_byte_count, + "messageLineCount": control_request.message_line_count, + }), + )?; + + match run_control::wait_for_steer_response( + &worktree_path, + &run.run_id, + &control_request.request_id, + request.wait_timeout, + )? { + Some(response) => { + let outcome = match &response.status { + LaneControlSteerResponseStatus::Delivered => RUN_CONTROL_ACTION_COMPLETED, + LaneControlSteerResponseStatus::Failed + | LaneControlSteerResponseStatus::Rejected => RUN_CONTROL_ACTION_FAILED, + }; + + state_store.record_run_control_action_outcome( + &receipt, + outcome, + &response.classification, + )?; + + Ok(lane_steer_report_from_response( + request.issue, + run, + &receipt, + &control_request, + &request_path, + response, + )) + }, + None => { + state_store.record_run_control_action_outcome( + &receipt, + RUN_CONTROL_ACTION_TIMED_OUT, + "steer_response_pending", + )?; + + Ok(lane_steer_report_pending( + request.issue, + run, + &receipt, + &control_request, + &request_path, + )) + }, + } +} + +fn lane_steer_report_from_rejected_receipt( + issue: &str, + run: &OperatorRunStatus, + receipt: &RunControlActionReceipt, + expected_turn_id: &str, + message_byte_count: usize, + message_line_count: usize, +) -> LaneSteerReport { + LaneSteerReport { + project_id: receipt.project_id().to_owned(), + issue_id: receipt.issue_id().to_owned(), + issue_identifier: run.issue_identifier.clone().or_else(|| Some(issue.to_owned())), + run_id: receipt.run_id().to_owned(), + attempt_number: receipt.attempt_number(), + thread_id: receipt.current_thread_id().map(str::to_owned), + expected_turn_id: expected_turn_id.to_owned(), + current_turn_id: receipt.current_turn_id().map(str::to_owned), + response_turn_id: None, + audit_record_id: receipt.audit_record_id(), + request_id: String::new(), + request_path: None, + outcome: receipt.outcome().to_owned(), + reason: receipt.reason().to_owned(), + failure_class: lane_steer_failure_class_for_reason(receipt.reason()).map(str::to_owned), + delivery_status: String::from("rejected"), + message_byte_count, + message_line_count, + } +} + +fn lane_steer_report_from_response( + issue: &str, + run: &OperatorRunStatus, + receipt: &RunControlActionReceipt, + request: &LaneControlSteerRequest, + request_path: &Path, + response: LaneControlSteerResponse, +) -> LaneSteerReport { + let outcome = match &response.status { + LaneControlSteerResponseStatus::Delivered => RUN_CONTROL_ACTION_COMPLETED, + LaneControlSteerResponseStatus::Failed | LaneControlSteerResponseStatus::Rejected => + RUN_CONTROL_ACTION_FAILED, + }; + + LaneSteerReport { + project_id: receipt.project_id().to_owned(), + issue_id: receipt.issue_id().to_owned(), + issue_identifier: run.issue_identifier.clone().or_else(|| Some(issue.to_owned())), + run_id: receipt.run_id().to_owned(), + attempt_number: receipt.attempt_number(), + thread_id: Some(response.thread_id.clone()), + expected_turn_id: response.expected_turn_id.clone(), + current_turn_id: response.current_turn_id.clone(), + response_turn_id: response.response_turn_id.clone(), + audit_record_id: receipt.audit_record_id(), + request_id: response.request_id.clone(), + request_path: Some(request_path.display().to_string()), + outcome: outcome.to_owned(), + reason: response.classification.clone(), + failure_class: response.error_class.clone(), + delivery_status: String::from("resolved"), + message_byte_count: request.message_byte_count, + message_line_count: request.message_line_count, + } +} + +fn lane_steer_report_pending( + issue: &str, + run: &OperatorRunStatus, + receipt: &RunControlActionReceipt, + request: &LaneControlSteerRequest, + request_path: &Path, +) -> LaneSteerReport { + LaneSteerReport { + project_id: receipt.project_id().to_owned(), + issue_id: receipt.issue_id().to_owned(), + issue_identifier: run.issue_identifier.clone().or_else(|| Some(issue.to_owned())), + run_id: receipt.run_id().to_owned(), + attempt_number: receipt.attempt_number(), + thread_id: Some(request.thread_id.clone()), + expected_turn_id: request.expected_turn_id.clone(), + current_turn_id: run.turn_id.clone(), + response_turn_id: None, + audit_record_id: receipt.audit_record_id(), + request_id: request.request_id.clone(), + request_path: Some(request_path.display().to_string()), + outcome: RUN_CONTROL_ACTION_ACCEPTED.to_owned(), + reason: String::from("queued_wait_timeout"), + failure_class: None, + delivery_status: String::from("queued"), + message_byte_count: request.message_byte_count, + message_line_count: request.message_line_count, + } +} + +fn validate_lane_steer_request(request: &LaneSteerRequest<'_>) -> Result<()> { + if request.issue.trim().is_empty() { + eyre::bail!("Lane steer issue must not be empty."); + } + if request.run_id.trim().is_empty() { + eyre::bail!("Lane steer run id must not be empty."); + } + if request.expected_turn_id.trim().is_empty() { + eyre::bail!("Lane steer expected turn id must not be empty."); + } + if request.message.trim().is_empty() { + eyre::bail!("Lane steer message must not be empty."); + } + if request.source.trim().is_empty() { + eyre::bail!("Lane steer source must not be empty."); + } + + Ok(()) +} + +fn lane_steer_failure_class_for_reason(reason: &str) -> Option<&'static str> { + match reason { + "turn_mismatch" | "stale_expected_turn_id" => Some("stale_expected_turn_id"), + "active_turn_not_steerable" => Some("active_turn_not_steerable"), + "app_server_turn_steer_timed_out" => Some("app_server_turn_steer_timed_out"), + "app_server_turn_steer_unsupported" => Some("app_server_turn_steer_unsupported"), + "app_server_turn_steer_failed" => Some("app_server_turn_steer_failed"), + "active_run_control_channel_resolved" | "queued_wait_timeout" => None, + _ => Some("run_control_action_failed"), + } +} + +fn lane_steer_message_line_count(message: &str) -> usize { + message.lines().count().max(usize::from(!message.is_empty())) +} + fn attempt_soft_lane_interrupt( state_store: &StateStore, project: &ServiceConfig, @@ -483,6 +805,7 @@ fn attempt_soft_lane_interrupt( timeout_ms: Some( i64::try_from(LANE_INTERRUPT_RESPONSE_WAIT.as_millis()).unwrap_or(i64::MAX), ), + metadata: None, })?; if receipt.outcome() != "accepted" { @@ -665,6 +988,7 @@ fn record_hard_interrupt_control_fallback( source: "hard_interrupt_fallback", action: "interrupt", timeout_ms: None, + metadata: None, })?; state_store.record_run_control_action_outcome( diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index e3c0d013..ee29ff51 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -1,9 +1,7 @@ -use base64::engine::general_purpose::STANDARD; -use base64::Engine as _; +use base64::{Engine as _, engine::general_purpose::STANDARD}; use sha1::{Digest as _, Sha1}; -use crate::accounts; -use crate::accounts::AccountUseRequest; +use crate::accounts::{self, AccountUseRequest}; const OPERATOR_DASHBOARD_HTML: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/src/orchestrator/operator_dashboard.html")); @@ -11,10 +9,8 @@ const OPERATOR_DASHBOARD_ICON_PNG: &[u8] = include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/src/orchestrator/assets/icon.png")); const OPERATOR_DASHBOARD_LOGO_ICO: &[u8] = include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/src/orchestrator/assets/logo.ico")); -const OPERATOR_DASHBOARD_LOGO_TOUCH_PNG: &[u8] = include_bytes!(concat!( - env!("CARGO_MANIFEST_DIR"), - "/src/orchestrator/assets/logo-touch.png" -)); +const OPERATOR_DASHBOARD_LOGO_TOUCH_PNG: &[u8] = + include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/src/orchestrator/assets/logo-touch.png")); const OPERATOR_HTTP_READ_TIMEOUT: Duration = Duration::from_millis(250); #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -29,6 +25,7 @@ enum OperatorRequestRoute { LinearScan, LaneInspect, LaneInterrupt, + LaneSteer, AccountList { force_refresh: bool }, AccountSelect, AccountClear, @@ -64,7 +61,9 @@ impl DashboardEventHub { fn broadcast(&self, event_type: &'static str, payload: Value) { let Ok(mut clients) = self.clients.lock() else { - tracing::warn!("Skipped dashboard event broadcast because the client list lock is poisoned."); + tracing::warn!( + "Skipped dashboard event broadcast because the client list lock is poisoned." + ); return; }; @@ -145,6 +144,22 @@ struct OperatorLaneInterruptHttpRequest { reason: Option, } +#[derive(Deserialize)] +struct OperatorLaneSteerHttpRequest { + #[serde(alias = "projectId")] + project_id: Option, + issue: Option, + #[serde(alias = "issueId")] + issue_id: Option, + #[serde(alias = "runId")] + run_id: String, + #[serde(alias = "expectedTurnId")] + expected_turn_id: String, + message: String, + #[serde(alias = "waitTimeoutMs")] + wait_timeout_ms: Option, +} + struct DashboardControlAck<'a> { request_id: Option<&'a str>, action: &'a str, @@ -255,6 +270,13 @@ fn handle_operator_state_endpoint_connection( return Ok(()); } + if route == OperatorRequestRoute::LaneSteer { + let response = build_operator_lane_steer_http_response(state_store, &request); + + stream.write_all(&response)?; + + return Ok(()); + } if route == OperatorRequestRoute::AppSnapshot { let response = build_operator_app_snapshot_http_response(snapshot); @@ -309,7 +331,9 @@ fn snapshot_json_with_live_account_control(snapshot_json: &[u8]) -> Vec { } } -fn build_operator_app_snapshot_http_response(snapshot: &Arc>) -> Vec { +fn build_operator_app_snapshot_http_response( + snapshot: &Arc>, +) -> Vec { let snapshot = match snapshot.lock() { Ok(snapshot) => snapshot, Err(error) => { @@ -376,10 +400,12 @@ fn handle_operator_dashboard_websocket_connection( write_current_dashboard_run_activity_event(&mut stream, state_store, &session.subscription); loop { - for frame in read_dashboard_websocket_client_frames(&mut stream, &mut client_frame_buffer)? { + for frame in read_dashboard_websocket_client_frames(&mut stream, &mut client_frame_buffer)? + { match frame { DashboardClientFrame::Text(payload) => { - let response = handle_dashboard_client_message(&mut session, state_store, &payload); + let response = + handle_dashboard_client_message(&mut session, state_store, &payload); write_dashboard_websocket_event(&mut stream, "controlAck", &response)?; @@ -406,8 +432,7 @@ fn handle_operator_dashboard_websocket_connection( match events.recv_timeout(Duration::from_millis(100)) { Ok(event) => { - if let Some(event) = - dashboard_event_for_subscription(&event, &session.subscription) + if let Some(event) = dashboard_event_for_subscription(&event, &session.subscription) { write_dashboard_websocket_event(&mut stream, event.event_type, &event.payload)?; } @@ -460,7 +485,9 @@ fn run_operator_run_activity_websocket_broadcasts( } } -fn build_operator_run_activity_event(state_store: &StateStore) -> Result { +fn build_operator_run_activity_event( + state_store: &StateStore, +) -> Result { let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp(); let account_control = global_codex_account_control_status(); let mut accounts = Vec::new(); @@ -596,11 +623,7 @@ fn write_current_dashboard_run_activity_event( } fn dashboard_run_activity_event_has_active_runs(event: &DashboardBroadcastEvent) -> bool { - event - .payload - .get("activeRuns") - .and_then(Value::as_array) - .is_some_and(|runs| !runs.is_empty()) + event.payload.get("activeRuns").and_then(Value::as_array).is_some_and(|runs| !runs.is_empty()) } fn write_dashboard_websocket_event( @@ -673,9 +696,7 @@ fn read_dashboard_websocket_client_frames( frames.push(frame); } }, - Err(error) - if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => - { + Err(error) if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => { break; }, Err(error) if error.kind() == ErrorKind::Interrupted => continue, @@ -1038,18 +1059,11 @@ fn dashboard_subscription_payload(subscription: &DashboardClientSubscription) -> } fn dashboard_required_account_selector(message: &DashboardClientMessage) -> Option<&str> { - message - .account_selector - .as_deref() - .map(str::trim) - .filter(|value| !value.is_empty()) + message.account_selector.as_deref().map(str::trim).filter(|value| !value.is_empty()) } fn dashboard_clean_scope_value(value: Option<&str>) -> Option { - value - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(str::to_owned) + value.map(str::trim).filter(|value| !value.is_empty()).map(str::to_owned) } fn dashboard_event_for_subscription( @@ -1060,17 +1074,12 @@ fn dashboard_event_for_subscription( return Some(event.clone()); } - let active_runs = event - .payload - .get("activeRuns") - .and_then(Value::as_array) - .map(|runs| { - runs - .iter() - .filter(|run| dashboard_run_matches_subscription(run, subscription)) - .cloned() - .collect::>() - })?; + let active_runs = event.payload.get("activeRuns").and_then(Value::as_array).map(|runs| { + runs.iter() + .filter(|run| dashboard_run_matches_subscription(run, subscription)) + .cloned() + .collect::>() + })?; let mut payload = event.payload.clone(); payload["activeRuns"] = Value::Array(active_runs); @@ -1081,7 +1090,9 @@ fn dashboard_event_for_subscription( } fn dashboard_subscription_is_empty(subscription: &DashboardClientSubscription) -> bool { - subscription.project_id.is_none() && subscription.issue_id.is_none() && subscription.run_id.is_none() + subscription.project_id.is_none() + && subscription.issue_id.is_none() + && subscription.run_id.is_none() } fn dashboard_run_matches_subscription( @@ -1160,23 +1171,15 @@ fn websocket_accept_key(key: &str) -> String { } fn operator_http_header_value<'a>(request: &'a str, header_name: &str) -> Option<&'a str> { - request - .lines() - .skip(1) - .take_while(|line| !line.trim().is_empty()) - .find_map(|line| { - let (name, value) = line.split_once(':')?; + request.lines().skip(1).take_while(|line| !line.trim().is_empty()).find_map(|line| { + let (name, value) = line.split_once(':')?; - name.trim() - .eq_ignore_ascii_case(header_name) - .then(|| value.trim()) - }) + name.trim().eq_ignore_ascii_case(header_name).then(|| value.trim()) + }) } fn operator_http_header_contains_token(value: &str, token: &str) -> bool { - value - .split(',') - .any(|candidate| candidate.trim().eq_ignore_ascii_case(token)) + value.split(',').any(|candidate| candidate.trim().eq_ignore_ascii_case(token)) } fn read_operator_state_request_headers(stream: &mut TcpStream) -> Result> { @@ -1292,23 +1295,20 @@ fn operator_account_http_response_body( .map_err(Into::into), OperatorRequestRoute::AccountSelect => { let selector = operator_account_request_selector(request)?; - let response = accounts::hydrate_account_list_usage( - accounts::account_select(&selector)?, - ); + let response = + accounts::hydrate_account_list_usage(accounts::account_select(&selector)?); serde_json::to_vec(&response).map_err(Into::into) }, OperatorRequestRoute::AccountClear => { - let response = - accounts::hydrate_account_list_usage(accounts::account_clear()?); + let response = accounts::hydrate_account_list_usage(accounts::account_clear()?); serde_json::to_vec(&response).map_err(Into::into) }, OperatorRequestRoute::AccountLogout => { let selector = operator_account_request_selector(request)?; - let response = accounts::hydrate_account_list_usage( - accounts::account_logout(&selector)?, - ); + let response = + accounts::hydrate_account_list_usage(accounts::account_logout(&selector)?); serde_json::to_vec(&response).map_err(Into::into) }, @@ -1319,9 +1319,9 @@ fn operator_account_http_response_body( .as_deref() .filter(|path| !path.trim().is_empty()) .ok_or_else(|| eyre::eyre!("Account import requires auth_json_path."))?; - let response = accounts::hydrate_account_list_usage( - accounts::account_import(Path::new(auth_json_path))?, - ); + let response = accounts::hydrate_account_list_usage(accounts::account_import( + Path::new(auth_json_path), + )?); serde_json::to_vec(&response).map_err(Into::into) }, @@ -1444,10 +1444,7 @@ fn operator_linear_scan_request_project_id(request: &[u8]) -> Result Vec { +fn build_operator_lane_inspect_http_response(state_store: &StateStore, request: &[u8]) -> Vec { match operator_lane_inspect_http_response_body(state_store, request) { Ok(body) => http_response_bytes("200 OK", "application/json", &body), Err(error) => operator_lane_error_http_response(error), @@ -1514,6 +1511,69 @@ fn operator_lane_interrupt_http_response_body( Ok((status_line, serde_json::to_vec(&report)?)) } +fn build_operator_lane_steer_http_response(state_store: &StateStore, request: &[u8]) -> Vec { + match operator_lane_steer_http_response_body(state_store, request) { + Ok((status_line, body)) => http_response_bytes(status_line, "application/json", &body), + Err(error) => operator_lane_error_http_response(error), + } +} + +fn operator_lane_steer_http_response_body( + state_store: &StateStore, + request: &[u8], +) -> Result<(&'static str, Vec)> { + let body = operator_http_request_body(request)?; + let request: OperatorLaneSteerHttpRequest = serde_json::from_slice(body) + .map_err(|error| eyre::eyre!("Lane steer request body was not valid JSON: {error}"))?; + let issue = request + .issue + .as_deref() + .or(request.issue_id.as_deref()) + .map(str::trim) + .filter(|issue| !issue.is_empty()) + .ok_or_else(|| eyre::eyre!("Lane steer request requires issue or issueId."))?; + + if request.run_id.trim().is_empty() { + eyre::bail!("Lane steer request runId must not be blank."); + } + if request.expected_turn_id.trim().is_empty() { + eyre::bail!("Lane steer request expectedTurnId must not be blank."); + } + if request.message.trim().is_empty() { + eyre::bail!("Lane steer request message must not be blank."); + } + + let project = operator_lane_http_project(state_store, request.project_id.as_deref())?; + let wait_timeout = request + .wait_timeout_ms + .map(Duration::from_millis) + .unwrap_or(DEFAULT_STEER_RESULT_WAIT_TIMEOUT); + let steer_request = LaneSteerRequest { + config_path: None, + project_id: None, + issue, + run_id: request.run_id.trim(), + expected_turn_id: request.expected_turn_id.trim(), + message: &request.message, + source: "http", + wait_timeout, + }; + let report = lane_control::steer_lane_with_state(state_store, &project, &steer_request)?; + let status_line = if lane_steer_report_is_rejected_or_failed(&report) { + "409 Conflict" + } else if report.delivery_status == "queued" { + "202 Accepted" + } else { + "200 OK" + }; + + Ok((status_line, serde_json::to_vec(&report)?)) +} + +fn lane_steer_report_is_rejected_or_failed(report: &LaneSteerReport) -> bool { + matches!(report.outcome.as_str(), "rejected" | "failed" | "timed_out" | "fallback") +} + fn operator_lane_http_project( state_store: &StateStore, project_id: Option<&str>, @@ -1619,39 +1679,45 @@ fn percent_decode_operator_query_value(value: &str) -> Result { fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> Vec { match route { - OperatorRequestRoute::Dashboard => { - http_response_bytes("200 OK", "text/html; charset=utf-8", OPERATOR_DASHBOARD_HTML.as_bytes()) - }, - OperatorRequestRoute::DashboardIconPng => { - http_response_bytes("200 OK", "image/png", OPERATOR_DASHBOARD_ICON_PNG) - }, - OperatorRequestRoute::DashboardLogoIco => { - http_response_bytes("200 OK", "image/x-icon", OPERATOR_DASHBOARD_LOGO_ICO) - }, - OperatorRequestRoute::DashboardLogoTouchPng => { - http_response_bytes("200 OK", "image/png", OPERATOR_DASHBOARD_LOGO_TOUCH_PNG) - }, + OperatorRequestRoute::Dashboard => http_response_bytes( + "200 OK", + "text/html; charset=utf-8", + OPERATOR_DASHBOARD_HTML.as_bytes(), + ), + OperatorRequestRoute::DashboardIconPng => + http_response_bytes("200 OK", "image/png", OPERATOR_DASHBOARD_ICON_PNG), + OperatorRequestRoute::DashboardLogoIco => + http_response_bytes("200 OK", "image/x-icon", OPERATOR_DASHBOARD_LOGO_ICO), + OperatorRequestRoute::DashboardLogoTouchPng => + http_response_bytes("200 OK", "image/png", OPERATOR_DASHBOARD_LOGO_TOUCH_PNG), OperatorRequestRoute::DashboardWs => websocket_upgrade_required_response(), - OperatorRequestRoute::AppSnapshot => { - http_response_bytes("200 OK", "application/json", b"{}") - }, - OperatorRequestRoute::LinearScan => { - http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") - }, - OperatorRequestRoute::LaneInspect | OperatorRequestRoute::LaneInterrupt => { - http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed") - }, - OperatorRequestRoute::Live => { - http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ok") - }, + OperatorRequestRoute::AppSnapshot => + http_response_bytes("200 OK", "application/json", b"{}"), + OperatorRequestRoute::LinearScan => http_response_bytes( + "405 Method Not Allowed", + "text/plain; charset=utf-8", + b"method not allowed", + ), + OperatorRequestRoute::LaneInspect + | OperatorRequestRoute::LaneInterrupt + | OperatorRequestRoute::LaneSteer => http_response_bytes( + "405 Method Not Allowed", + "text/plain; charset=utf-8", + b"method not allowed", + ), + OperatorRequestRoute::Live => + http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ok"), OperatorRequestRoute::AccountList { .. } | OperatorRequestRoute::AccountSelect | OperatorRequestRoute::AccountClear | OperatorRequestRoute::AccountLogout | OperatorRequestRoute::AccountImport | OperatorRequestRoute::AccountUse - | OperatorRequestRoute::AccountRerollName => - http_response_bytes("405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed"), + | OperatorRequestRoute::AccountRerollName => http_response_bytes( + "405 Method Not Allowed", + "text/plain; charset=utf-8", + b"method not allowed", + ), } } @@ -1682,9 +1748,8 @@ fn parse_operator_state_request_route( b"missing path", )); }; - let path_without_query = path - .split_once('?') - .map_or(path, |(path_without_query, _)| path_without_query); + let path_without_query = + path.split_once('?').map_or(path, |(path_without_query, _)| path_without_query); let query = path.split_once('?').map(|(_, query)| query).unwrap_or_default(); let normalized_path = path_without_query .split_once('#') @@ -1697,35 +1762,42 @@ fn parse_operator_state_request_route( ("GET", "/assets/logo.ico") => Ok(OperatorRequestRoute::DashboardLogoIco), ("GET", "/assets/logo-touch.png") => Ok(OperatorRequestRoute::DashboardLogoTouchPng), ("GET", OPERATOR_DASHBOARD_WS_ENDPOINT_PATH) => Ok(OperatorRequestRoute::DashboardWs), - ("GET", OPERATOR_LIVE_ENDPOINT_PATH) => Ok(OperatorRequestRoute::Live), - ("GET", OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::AppSnapshot), - ("GET", OPERATOR_ACCOUNTS_ENDPOINT_PATH) => Ok(OperatorRequestRoute::AccountList { - force_refresh: operator_query_has_flag(query, "refresh"), - }), - ("POST", OPERATOR_LINEAR_SCAN_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LinearScan), - ("GET", OPERATOR_LANE_INSPECT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInspect), - ("POST", OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInterrupt), - ("POST", "/api/accounts/select") => Ok(OperatorRequestRoute::AccountSelect), + ("GET", OPERATOR_LIVE_ENDPOINT_PATH) => Ok(OperatorRequestRoute::Live), + ("GET", OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::AppSnapshot), + ("GET", OPERATOR_ACCOUNTS_ENDPOINT_PATH) => Ok(OperatorRequestRoute::AccountList { + force_refresh: operator_query_has_flag(query, "refresh"), + }), + ("POST", OPERATOR_LINEAR_SCAN_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LinearScan), + ("GET", OPERATOR_LANE_INSPECT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInspect), + ("POST", OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH) => Ok(OperatorRequestRoute::LaneInterrupt), + ("POST", OPERATOR_LANE_STEER_ENDPOINT_PATH | OPERATOR_LANE_STEER_ALIAS_ENDPOINT_PATH) => + Ok(OperatorRequestRoute::LaneSteer), + ("POST", "/api/accounts/select") => Ok(OperatorRequestRoute::AccountSelect), ("POST", "/api/accounts/clear") => Ok(OperatorRequestRoute::AccountClear), ("POST", "/api/accounts/logout") => Ok(OperatorRequestRoute::AccountLogout), ("POST", "/api/accounts/import") => Ok(OperatorRequestRoute::AccountImport), ("POST", "/api/accounts/use") => Ok(OperatorRequestRoute::AccountUse), ("POST", "/api/accounts/reroll-name") => Ok(OperatorRequestRoute::AccountRerollName), - (_, OPERATOR_DASHBOARD_ENDPOINT_PATH + ( + _, + OPERATOR_DASHBOARD_ENDPOINT_PATH | OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH | OPERATOR_DASHBOARD_WS_ENDPOINT_PATH - | OPERATOR_LIVE_ENDPOINT_PATH - | OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH - | OPERATOR_LINEAR_SCAN_ENDPOINT_PATH - | OPERATOR_LANE_INSPECT_ENDPOINT_PATH - | OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH - | OPERATOR_ACCOUNTS_ENDPOINT_PATH + | OPERATOR_LIVE_ENDPOINT_PATH + | OPERATOR_APP_SNAPSHOT_ENDPOINT_PATH + | OPERATOR_LINEAR_SCAN_ENDPOINT_PATH + | OPERATOR_LANE_INSPECT_ENDPOINT_PATH + | OPERATOR_LANE_INTERRUPT_ENDPOINT_PATH + | OPERATOR_LANE_STEER_ENDPOINT_PATH + | OPERATOR_LANE_STEER_ALIAS_ENDPOINT_PATH + | OPERATOR_ACCOUNTS_ENDPOINT_PATH | "/api/accounts/select" | "/api/accounts/clear" | "/api/accounts/logout" | "/api/accounts/import" | "/api/accounts/use" - | "/api/accounts/reroll-name") => Err(http_response_bytes( + | "/api/accounts/reroll-name", + ) => Err(http_response_bytes( "405 Method Not Allowed", "text/plain; charset=utf-8", b"method not allowed", @@ -1752,10 +1824,8 @@ fn http_response_bytes_with_headers( extra_headers: &[(&str, String)], body: &[u8], ) -> Vec { - let mut response = format!( - "HTTP/1.1 {status_line}\r\nContent-Type: {content_type}\r\n" - ) - .into_bytes(); + let mut response = + format!("HTTP/1.1 {status_line}\r\nContent-Type: {content_type}\r\n").into_bytes(); for (header, value) in extra_headers { response.extend_from_slice(format!("{header}: {value}\r\n").as_bytes()); diff --git a/apps/decodex/src/orchestrator/tests/operator/status/http.rs b/apps/decodex/src/orchestrator/tests/operator/status/http.rs index ae0622f8..89bda121 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/http.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/http.rs @@ -1640,6 +1640,195 @@ fn operator_lane_interrupt_api_force_does_not_hard_fallback_after_control_reject })); } +#[test] +fn operator_lane_steer_api_rejects_stale_expected_turn_id() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + let body = br#"{"projectId":"pubfi","issue":"PUB-101","runId":"pub-101-attempt-1","expectedTurnId":"turn-old","message":"please adjust priority"}"#; + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_STEER_ENDPOINT_PATH, + body.len(), + String::from_utf8_lossy(body) + ); + + fs::create_dir_all(&worktree_path).expect("worktree should exist"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + let channel_path = + worktree_path.join(RUN_CONTROL_CHANNEL_DIR).join("pub-101-attempt-1.channel"); + + fs::create_dir_all(channel_path.parent().expect("channel path should have parent")) + .expect("run-control channel dir should exist"); + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + state_store + .publish_run_control_channel_for_active_attempt( + "pub-101-attempt-1", + 1, + &channel_path, + RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, + ) + .expect("control channel should publish"); + + let response = String::from_utf8(orchestrator::build_operator_lane_steer_http_response( + &state_store, + request.as_bytes(), + )) + .expect("lane steer response should be utf-8"); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane steer response should include body"); + let data: Value = serde_json::from_str(body).expect("lane steer response should be json"); + let events = state_store + .list_private_execution_events(config.service_id(), &issue.id, "pub-101-attempt-1", 1) + .expect("private control audit should read"); + + assert!(response.starts_with("HTTP/1.1 409 Conflict\r\n")); + assert_eq!(data["outcome"], "rejected"); + assert_eq!(data["reason"], "turn_mismatch"); + assert_eq!(data["failureClass"], "stale_expected_turn_id"); + assert_eq!(data["expectedTurnId"], "turn-old"); + assert_eq!(data["currentTurnId"], "turn-1"); + assert!(events.iter().any(|event| { + event.event_type() == "control_action" + && event.payload()["action"] == "steer" + && event.payload()["failure_class"] == "stale_expected_turn_id" + })); +} + +#[test] +fn operator_lane_steer_endpoint_accepts_large_operator_message_body() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("In Progress", &[]); + let worktree_path = config.worktree_root().join(&issue.identifier); + let channel_path = + worktree_path.join(RUN_CONTROL_CHANNEL_DIR).join("pub-101-attempt-1.channel"); + + fs::create_dir_all(channel_path.parent().expect("channel path should have parent")) + .expect("run-control channel dir should exist"); + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("pub-101-attempt-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .update_run_thread("pub-101-attempt-1", "thread-1") + .expect("thread should record"); + state_store + .update_run_turn("pub-101-attempt-1", "turn-1") + .expect("turn should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "pub-101-attempt-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + state_store + .publish_run_control_channel_for_active_attempt( + "pub-101-attempt-1", + 1, + &channel_path, + RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, + ) + .expect("control channel should publish"); + + let body = serde_json::json!({ + "projectId": "pubfi", + "issue": "PUB-101", + "runId": "pub-101-attempt-1", + "expectedTurnId": "turn-old", + "message": "x".repeat(12 * 1_024), + }) + .to_string(); + let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind"); + let address = listener.local_addr().expect("listener address should resolve"); + let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot::default())); + let server_snapshot = Arc::clone(&snapshot); + let server_state_store = Arc::clone(&state_store); + let server = thread::spawn(move || { + let (stream, _) = listener.accept().expect("listener should accept a connection"); + let dashboard_events = DashboardEventHub::default(); + + orchestrator::handle_operator_state_endpoint_connection( + stream, + &server_snapshot, + &dashboard_events, + &OperatorControlRequests::default(), + &server_state_store, + ) + .expect("handler should accept broad steer request body"); + }); + let request = format!( + "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + orchestrator::OPERATOR_LANE_STEER_ENDPOINT_PATH, + body.len(), + body + ); + let mut client = TcpStream::connect(address).expect("client should connect"); + let mut response = String::new(); + + client.write_all(request.as_bytes()).expect("client should write request"); + client.shutdown(Shutdown::Write).expect("client should close request body"); + client.read_to_string(&mut response).expect("client should read response"); + server.join().expect("server thread should complete"); + + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body) + .expect("lane steer response should include body"); + let data: Value = serde_json::from_str(body).expect("lane steer response should be json"); + + assert!(response.starts_with("HTTP/1.1 409 Conflict\r\n")); + assert_eq!(data["failureClass"], "stale_expected_turn_id"); + assert_eq!(data["messageByteCount"], 12 * 1_024); +} + #[test] fn operator_state_endpoint_serves_account_api_snapshot() { let temp_dir = TempDir::new().expect("temp dir should exist"); diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 0109c099..c9d6178d 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -51,6 +51,42 @@ pub(crate) struct EvidenceRequest<'a> { pub(crate) include_payload: bool, } +/// Active lane steer request. +pub(crate) struct LaneSteerRequest<'a> { + pub(crate) config_path: Option<&'a Path>, + pub(crate) project_id: Option<&'a str>, + pub(crate) issue: &'a str, + pub(crate) run_id: &'a str, + pub(crate) expected_turn_id: &'a str, + pub(crate) message: &'a str, + pub(crate) source: &'a str, + pub(crate) wait_timeout: Duration, +} + +/// Active lane steer result without raw operator message content. +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneSteerReport { + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) issue_identifier: Option, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: Option, + pub(crate) expected_turn_id: String, + pub(crate) current_turn_id: Option, + pub(crate) response_turn_id: Option, + pub(crate) audit_record_id: i64, + pub(crate) request_id: String, + pub(crate) request_path: Option, + pub(crate) outcome: String, + pub(crate) reason: String, + pub(crate) failure_class: Option, + pub(crate) delivery_status: String, + pub(crate) message_byte_count: usize, + pub(crate) message_line_count: usize, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct RunSummary { project_id: String, diff --git a/apps/decodex/src/run_control.rs b/apps/decodex/src/run_control.rs index ee50b7be..ec404fb3 100644 --- a/apps/decodex/src/run_control.rs +++ b/apps/decodex/src/run_control.rs @@ -15,8 +15,12 @@ use crate::prelude::{self, eyre}; const RUN_CONTROL_DIR: &str = ".decodex-run-control"; const REQUEST_SUFFIX: &str = ".request.json"; const RESPONSE_SUFFIX: &str = ".response.json"; +const STEER_REQUEST_SUFFIX: &str = ".steer-request.json"; +const STEER_RESPONSE_SUFFIX: &str = ".steer-response.json"; const SCHEMA_INTERRUPT_REQUEST: &str = "decodex/run-control/interrupt-request/1"; const SCHEMA_INTERRUPT_RESPONSE: &str = "decodex/run-control/interrupt-response/1"; +const SCHEMA_STEER_REQUEST: &str = "decodex/run-control/steer-request/1"; +const SCHEMA_STEER_RESPONSE: &str = "decodex/run-control/steer-response/1"; const POLL_INTERVAL: Duration = Duration::from_millis(100); #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] @@ -69,6 +73,63 @@ pub(crate) struct PendingLaneControlRequest { pub(crate) request: LaneControlInterruptRequest, } +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneControlSteerRequest { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) audit_record_id: i64, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) expected_turn_id: String, + pub(crate) source: String, + pub(crate) message: String, + pub(crate) message_byte_count: usize, + pub(crate) message_line_count: usize, + pub(crate) created_at_unix_epoch: i64, +} +impl LaneControlSteerRequest { + pub(crate) fn new(input: LaneControlSteerRequestInput<'_>) -> Self { + Self { + schema: String::from(SCHEMA_STEER_REQUEST), + request_id: fresh_request_id(input.run_id), + audit_record_id: input.audit_record_id, + project_id: input.project_id.to_owned(), + issue_id: input.issue_id.to_owned(), + run_id: input.run_id.to_owned(), + attempt_number: input.attempt_number, + thread_id: input.thread_id.to_owned(), + expected_turn_id: input.expected_turn_id.to_owned(), + source: input.source.to_owned(), + message: input.message.to_owned(), + message_byte_count: input.message.len(), + message_line_count: message_line_count(input.message), + created_at_unix_epoch: OffsetDateTime::now_utc().unix_timestamp(), + } + } +} + +pub(crate) struct LaneControlSteerRequestInput<'a> { + pub(crate) audit_record_id: i64, + pub(crate) project_id: &'a str, + pub(crate) issue_id: &'a str, + pub(crate) run_id: &'a str, + pub(crate) attempt_number: i64, + pub(crate) thread_id: &'a str, + pub(crate) expected_turn_id: &'a str, + pub(crate) source: &'a str, + pub(crate) message: &'a str, +} + +#[derive(Clone, Debug)] +pub(crate) struct PendingLaneControlSteerRequest { + pub(crate) path: PathBuf, + pub(crate) request: LaneControlSteerRequest, +} + #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub(crate) enum LaneControlResponseStatus { @@ -77,6 +138,14 @@ pub(crate) enum LaneControlResponseStatus { Rejected, } +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum LaneControlSteerResponseStatus { + Delivered, + Failed, + Rejected, +} + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub(crate) struct LaneControlInterruptResponse { @@ -169,6 +238,107 @@ impl LaneControlInterruptResponse { } } +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct LaneControlSteerResponse { + pub(crate) schema: String, + pub(crate) request_id: String, + pub(crate) project_id: String, + pub(crate) issue_id: String, + pub(crate) run_id: String, + pub(crate) attempt_number: i64, + pub(crate) thread_id: String, + pub(crate) expected_turn_id: String, + pub(crate) current_turn_id: Option, + pub(crate) response_turn_id: Option, + pub(crate) status: LaneControlSteerResponseStatus, + pub(crate) classification: String, + pub(crate) method: String, + pub(crate) message: String, + pub(crate) error_class: Option, + pub(crate) recorded_at_unix_epoch: i64, +} +impl LaneControlSteerResponse { + pub(crate) fn delivered( + request: &LaneControlSteerRequest, + current_turn_id: &str, + response_turn_id: &str, + ) -> Self { + Self::from_request( + request, + LaneControlSteerResponseStatus::Delivered, + "turn_steer_delivered", + "turn/steer accepted by app-server.", + None, + Some(current_turn_id.to_owned()), + Some(response_turn_id.to_owned()), + ) + } + + pub(crate) fn failed( + request: &LaneControlSteerRequest, + current_turn_id: &str, + error_class: &str, + message: String, + ) -> Self { + Self::from_request( + request, + LaneControlSteerResponseStatus::Failed, + error_class, + message, + Some(error_class.to_owned()), + Some(current_turn_id.to_owned()), + None, + ) + } + + pub(crate) fn rejected( + request: &LaneControlSteerRequest, + current_turn_id: &str, + error_class: &str, + message: String, + ) -> Self { + Self::from_request( + request, + LaneControlSteerResponseStatus::Rejected, + "control_request_rejected", + message, + Some(error_class.to_owned()), + Some(current_turn_id.to_owned()), + None, + ) + } + + fn from_request( + request: &LaneControlSteerRequest, + status: LaneControlSteerResponseStatus, + classification: &str, + message: impl Into, + error_class: Option, + current_turn_id: Option, + response_turn_id: Option, + ) -> Self { + Self { + schema: String::from(SCHEMA_STEER_RESPONSE), + request_id: request.request_id.clone(), + project_id: request.project_id.clone(), + issue_id: request.issue_id.clone(), + run_id: request.run_id.clone(), + attempt_number: request.attempt_number, + thread_id: request.thread_id.clone(), + expected_turn_id: request.expected_turn_id.clone(), + current_turn_id, + response_turn_id, + status, + classification: classification.to_owned(), + method: String::from("turn/steer"), + message: message.into(), + error_class, + recorded_at_unix_epoch: OffsetDateTime::now_utc().unix_timestamp(), + } + } +} + pub(crate) fn write_interrupt_request( worktree_path: &Path, request: &LaneControlInterruptRequest, @@ -191,6 +361,28 @@ pub(crate) fn write_interrupt_response( Ok(path) } +pub(crate) fn write_steer_request( + worktree_path: &Path, + request: &LaneControlSteerRequest, +) -> prelude::Result { + let path = steer_request_path(worktree_path, &request.run_id, &request.request_id); + + write_json_file_atomically(&path, request)?; + + Ok(path) +} + +pub(crate) fn write_steer_response( + worktree_path: &Path, + response: &LaneControlSteerResponse, +) -> prelude::Result { + let path = steer_response_path(worktree_path, &response.run_id, &response.request_id); + + write_json_file_atomically(&path, response)?; + + Ok(path) +} + pub(crate) fn remove_interrupt_request(path: &Path) -> prelude::Result<()> { match fs::remove_file(path) { Ok(()) => Ok(()), @@ -199,6 +391,14 @@ pub(crate) fn remove_interrupt_request(path: &Path) -> prelude::Result<()> { } } +pub(crate) fn remove_steer_request(path: &Path) -> prelude::Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(()), + Err(error) => Err(error.into()), + } +} + pub(crate) fn pending_interrupt_requests( worktree_path: &Path, run_id: &str, @@ -224,6 +424,31 @@ pub(crate) fn pending_interrupt_requests( Ok(requests) } +pub(crate) fn pending_steer_requests( + worktree_path: &Path, + run_id: &str, +) -> prelude::Result> { + let dir = run_control_run_dir(worktree_path, run_id); + let Ok(entries) = fs::read_dir(&dir) else { + return Ok(Vec::new()); + }; + let mut requests = entries + .filter_map(std::result::Result::ok) + .map(|entry| entry.path()) + .filter(|path| file_name_ends_with(path, STEER_REQUEST_SUFFIX)) + .map(read_pending_steer_request) + .collect::>>()?; + + requests.sort_by(|left, right| { + left.request + .created_at_unix_epoch + .cmp(&right.request.created_at_unix_epoch) + .then_with(|| left.request.request_id.cmp(&right.request.request_id)) + }); + + Ok(requests) +} + pub(crate) fn wait_for_interrupt_response( worktree_path: &Path, run_id: &str, @@ -245,6 +470,27 @@ pub(crate) fn wait_for_interrupt_response( } } +pub(crate) fn wait_for_steer_response( + worktree_path: &Path, + run_id: &str, + request_id: &str, + timeout: Duration, +) -> prelude::Result> { + let started_at = Instant::now(); + + loop { + if let Some(response) = read_steer_response(worktree_path, run_id, request_id)? { + return Ok(Some(response)); + } + + if started_at.elapsed() >= timeout { + return Ok(None); + } + + thread::sleep(POLL_INTERVAL); + } +} + pub(crate) fn read_interrupt_response( worktree_path: &Path, run_id: &str, @@ -259,6 +505,20 @@ pub(crate) fn read_interrupt_response( } } +pub(crate) fn read_steer_response( + worktree_path: &Path, + run_id: &str, + request_id: &str, +) -> prelude::Result> { + let path = steer_response_path(worktree_path, run_id, request_id); + + match fs::read_to_string(path) { + Ok(raw) => serde_json::from_str(&raw).map(Some).map_err(Into::into), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + pub(crate) fn protocol_response_summary(value: &Value) -> String { match value { Value::Null => String::from("null"), @@ -291,6 +551,21 @@ fn read_pending_interrupt_request(path: PathBuf) -> prelude::Result prelude::Result { + let raw = fs::read_to_string(&path)?; + let request: LaneControlSteerRequest = serde_json::from_str(&raw)?; + + if request.schema != SCHEMA_STEER_REQUEST { + eyre::bail!( + "Unsupported lane-control steer request schema `{}` in `{}`.", + request.schema, + path.display() + ); + } + + Ok(PendingLaneControlSteerRequest { path, request }) +} + fn interrupt_request_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { run_control_run_dir(worktree_path, run_id).join(format!( "{}{}", @@ -307,6 +582,22 @@ fn interrupt_response_path(worktree_path: &Path, run_id: &str, request_id: &str) )) } +fn steer_request_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { + run_control_run_dir(worktree_path, run_id).join(format!( + "{}{}", + sanitize_path_component(request_id), + STEER_REQUEST_SUFFIX + )) +} + +fn steer_response_path(worktree_path: &Path, run_id: &str, request_id: &str) -> PathBuf { + run_control_run_dir(worktree_path, run_id).join(format!( + "{}{}", + sanitize_path_component(request_id), + STEER_RESPONSE_SUFFIX + )) +} + fn run_control_run_dir(worktree_path: &Path, run_id: &str) -> PathBuf { worktree_path.join(RUN_CONTROL_DIR).join(sanitize_path_component(run_id)) } @@ -338,6 +629,10 @@ fn file_name_ends_with(path: &Path, suffix: &str) -> bool { path.file_name().and_then(|name| name.to_str()).is_some_and(|name| name.ends_with(suffix)) } +fn message_line_count(message: &str) -> usize { + message.lines().count().max(usize::from(!message.is_empty())) +} + fn sanitize_path_component(value: &str) -> String { let sanitized = value .chars() diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index f148a110..a54a4d16 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -141,11 +141,14 @@ pub struct RunControlActionReceipt { attempt_number: i64, thread_id: Option, turn_id: Option, + current_thread_id: Option, + current_turn_id: Option, source: String, action: String, outcome: String, reason: String, audit_record_id: i64, + metadata: Option, channel: Option, } impl RunControlActionReceipt { @@ -179,6 +182,16 @@ impl RunControlActionReceipt { self.turn_id.as_deref() } + /// Current thread identifier observed while resolving the request. + pub fn current_thread_id(&self) -> Option<&str> { + self.current_thread_id.as_deref() + } + + /// Current turn identifier observed while resolving the request. + pub fn current_turn_id(&self) -> Option<&str> { + self.current_turn_id.as_deref() + } + /// Local source that requested the action. pub fn source(&self) -> &str { &self.source @@ -204,6 +217,11 @@ impl RunControlActionReceipt { self.audit_record_id } + /// Optional compact action metadata captured with the audit event. + pub fn metadata(&self) -> Option<&Value> { + self.metadata.as_ref() + } + /// Control channel selected for an accepted request. pub fn channel(&self) -> Option<&RunControlChannel> { self.channel.as_ref() @@ -496,6 +514,46 @@ pub(crate) struct RunControlActionRequest<'a> { pub(crate) action: &'a str, /// Optional caller timeout budget in milliseconds. pub(crate) timeout_ms: Option, + /// Optional compact, non-secret action metadata to include in audit evidence. + pub(crate) metadata: Option<&'a Value>, +} + +/// Follow-up outcome for a run-control action handled after initial resolution. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) struct RunControlActionOutcomeRequest<'a> { + /// Project identifier used for local audit scoping. + pub(crate) project_id: &'a str, + /// Issue identifier used for local audit scoping. + pub(crate) issue_id: &'a str, + /// Run identifier used for local audit scoping. + pub(crate) run_id: &'a str, + /// Attempt number used for local audit scoping. + pub(crate) attempt_number: i64, + /// Requested app-server thread identifier, when known. + pub(crate) thread_id: Option<&'a str>, + /// Requested expected app-server turn identifier, when known. + pub(crate) turn_id: Option<&'a str>, + /// Current app-server thread identifier observed while handling the request. + pub(crate) current_thread_id: Option<&'a str>, + /// Current app-server turn identifier observed while handling the request. + pub(crate) current_turn_id: Option<&'a str>, + /// Local source that requested the action. + pub(crate) source: &'a str, + /// Requested control action. + pub(crate) action: &'a str, + /// Follow-up outcome. + pub(crate) outcome: &'a str, + /// Normalized outcome reason. + pub(crate) reason: &'a str, + /// Parent request-resolution audit record id, when known. + pub(crate) parent_record_id: Option, + /// Optional caller timeout budget in milliseconds. + pub(crate) timeout_ms: Option, + /// Optional compact, non-secret action metadata to include in audit evidence. + pub(crate) metadata: Option<&'a Value>, + /// Control channel that carried the request, when known. + pub(crate) channel: Option<&'a RunControlChannel>, } /// Registered repo target managed by the local Decodex control plane. diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 3d11ee5b..4c6f92f0 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -848,11 +848,14 @@ impl StateStore { attempt_number: resolution.audit_target.attempt_number, thread_id: resolution.audit_target.thread_id, turn_id: resolution.audit_target.turn_id, + current_thread_id: resolution.audit_target.current_thread_id, + current_turn_id: resolution.audit_target.current_turn_id, source: resolution.audit_target.source, action: resolution.audit_target.action, outcome: resolution.outcome, reason: resolution.reason, audit_record_id: event.record_id(), + metadata: resolution.audit_target.metadata, channel: resolution.channel, }) } @@ -875,9 +878,12 @@ impl StateStore { attempt_number: receipt.attempt_number, thread_id: receipt.thread_id.clone(), turn_id: receipt.turn_id.clone(), + current_thread_id: receipt.current_thread_id.clone(), + current_turn_id: receipt.current_turn_id.clone(), source: receipt.source.clone(), action: receipt.action.clone(), timeout_ms: None, + metadata: receipt.metadata.clone(), channel: receipt.channel.clone(), }; @@ -889,6 +895,39 @@ impl StateStore { ) } + /// Append a follow-up audit outcome for a control action handled from a channel request. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn record_run_control_action_delivery_outcome( + &self, + request: RunControlActionOutcomeRequest<'_>, + ) -> Result { + validate_run_control_action_outcome(request.outcome)?; + validate_required_run_control_field("reason", request.reason)?; + + let target = RunControlAuditTarget { + project_id: request.project_id.to_owned(), + issue_id: request.issue_id.to_owned(), + run_id: request.run_id.to_owned(), + attempt_number: request.attempt_number, + thread_id: request.thread_id.map(str::to_owned), + turn_id: request.turn_id.map(str::to_owned), + current_thread_id: request.current_thread_id.map(str::to_owned), + current_turn_id: request.current_turn_id.map(str::to_owned), + source: request.source.to_owned(), + action: request.action.to_owned(), + timeout_ms: request.timeout_ms, + metadata: request.metadata.cloned(), + channel: request.channel.cloned(), + }; + + self.append_run_control_audit_event( + &target, + request.outcome, + request.reason, + request.parent_record_id, + ) + } + #[cfg_attr(not(test), allow(dead_code))] fn append_run_control_audit_event( &self, @@ -900,12 +939,14 @@ impl StateStore { validate_run_control_action_outcome(outcome)?; let channel = target.channel.as_ref(); + let failure_class = run_control_action_failure_class(&target.action, outcome, reason); let payload = serde_json::json!({ "schema": "decodex.run_control_action/v1", "action": target.action, "source": target.source, "outcome": outcome, "reason": reason, + "failure_class": failure_class, "parent_record_id": parent_record_id, "requested": { "project_id": target.project_id, @@ -916,6 +957,11 @@ impl StateStore { "turn_id": target.turn_id, "timeout_ms": target.timeout_ms, }, + "observed": { + "thread_id": target.current_thread_id, + "turn_id": target.current_turn_id, + }, + "metadata": target.metadata, "channel": channel.map(|channel| serde_json::json!({ "transport": channel.transport(), "channel_path": channel.channel_path().display().to_string(), @@ -1950,6 +1996,9 @@ struct RunControlAuditTarget { source: String, action: String, timeout_ms: Option, + current_thread_id: Option, + current_turn_id: Option, + metadata: Option, channel: Option, } @@ -2050,9 +2099,12 @@ fn resolve_run_control_action_locked( attempt_number: attempt.attempt_number, thread_id: request.thread_id.map(str::to_owned), turn_id: request.turn_id.map(str::to_owned), + current_thread_id: attempt.thread_id.clone(), + current_turn_id: attempt.turn_id.clone(), source: request.source.to_owned(), action: request.action.to_owned(), timeout_ms: request.timeout_ms, + metadata: request.metadata.cloned(), channel: None, }; @@ -2138,9 +2190,12 @@ fn rejected_run_control_resolution( attempt_number: request.attempt_number, thread_id: request.thread_id.map(str::to_owned), turn_id: request.turn_id.map(str::to_owned), + current_thread_id: None, + current_turn_id: None, source: request.source.to_owned(), action: request.action.to_owned(), timeout_ms: request.timeout_ms, + metadata: request.metadata.cloned(), channel: None, }), outcome: RUN_CONTROL_ACTION_REJECTED.to_owned(), @@ -2227,6 +2282,33 @@ fn validate_run_control_action_outcome(outcome: &str) -> Result<()> { Ok(()) } +fn run_control_action_failure_class( + action: &str, + outcome: &str, + reason: &str, +) -> Option<&'static str> { + if !matches!( + outcome, + RUN_CONTROL_ACTION_REJECTED + | RUN_CONTROL_ACTION_FAILED + | RUN_CONTROL_ACTION_TIMED_OUT + | RUN_CONTROL_ACTION_FALLBACK + ) { + return None; + } + if action == "steer" && reason == "turn_mismatch" { + return Some("stale_expected_turn_id"); + } + if action == "steer" && reason == "active_turn_not_steerable" { + return Some("active_turn_not_steerable"); + } + if action == "steer" && reason == "app_server_turn_steer_unsupported" { + return Some("app_server_turn_steer_unsupported"); + } + + Some("run_control_action_failed") +} + fn retarget_review_orchestration_issue( records: &mut HashMap, previous_issue_id: &str, diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 450b41e7..80610a4d 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -2573,6 +2573,7 @@ fn run_control_accepts_active_attempt_and_persists_audit() { source: "test_hook", action: "noop", timeout_ms: Some(500), + metadata: None, }) .expect("control request should resolve"); @@ -2641,8 +2642,9 @@ fn run_control_rejects_stale_turn_and_run_mismatch() { thread_id: Some("thread-1"), turn_id: Some("turn-old"), source: "test_hook", - action: "noop", + action: "steer", timeout_ms: None, + metadata: None, }) .expect("stale turn should be audited"); let stale_run = store @@ -2656,13 +2658,29 @@ fn run_control_rejects_stale_turn_and_run_mismatch() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("stale run should be audited"); assert_eq!(stale_turn.outcome(), "rejected"); assert_eq!(stale_turn.reason(), "turn_mismatch"); + assert_eq!(stale_turn.current_turn_id(), Some("turn-current")); assert_eq!(stale_run.outcome(), "rejected"); assert_eq!(stale_run.reason(), "run_not_found"); + + let events = store + .list_private_execution_events("pubfi", "issue-1", "run-current", 1) + .expect("private control audit should read"); + let stale_turn_event = events + .iter() + .find(|event| event.record_id() == stale_turn.audit_record_id()) + .expect("stale turn audit event should exist"); + + assert_eq!( + stale_turn_event.payload()["failure_class"].as_str(), + Some("stale_expected_turn_id") + ); + assert_eq!(stale_turn_event.payload()["observed"]["turn_id"].as_str(), Some("turn-current")); } #[test] @@ -2695,6 +2713,7 @@ fn run_control_rejects_missing_channel_file() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("missing channel should be audited"); @@ -2731,6 +2750,7 @@ fn run_control_requires_active_run_ownership() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("missing lease should be audited"); @@ -2749,6 +2769,7 @@ fn run_control_requires_active_run_ownership() { source: "test_hook", action: "noop", timeout_ms: None, + metadata: None, }) .expect("wrong active run should be audited"); diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index 4066c5b3..555d4e5d 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -180,14 +180,22 @@ accept active-lane stop/interrupt controls, project pause/resume controls, manua controls, or active-lane steer controls; use `decodex lane inspect`, `decodex lane interrupt`, or the local `/api/lane/*` endpoints instead. Account-pool selection remains available because it changes the global Codex account selector, not an active lane. +Active-lane steer is available through `decodex lane steer --run-id +--expected-turn-id --message `, canonical `POST /api/lane/steer`, +and legacy alias `POST /api/lane-steer`. These surfaces require the expected active +turn id, audit accepted or rejected state locally, and keep raw steer text out of +public tracker projections. `runActivity.activeRunsComplete` marks whether a payload is the complete active-run list; subscription-filtered payloads set it to `false`, so consumers must not treat a missing run in that payload as ended. Active run rows may include `control_capability` with the active attempt's project, issue, run id, attempt, current thread/turn ids, local transport, channel path, status, -and timestamps. It is local routing metadata for future CLI/API controls, not a -dashboard command surface. +and timestamps. It is local routing metadata for CLI/API controls, not a dashboard +command surface. +After a steer request is handled, active run protocol activity may show a compact +`turn/steer` entry with outcome, failure class, and response turn id. It does not +include the operator message. Snapshot `warnings` remain stable machine-readable tokens. When a warning needs operator action, snapshots may also include `warning_details` entries with the affected `project_id`, `repo_root`, reason, and next action; for example, a stale diff --git a/docs/spec/app-server.md b/docs/spec/app-server.md index 3e474d63..4b240fd8 100644 --- a/docs/spec/app-server.md +++ b/docs/spec/app-server.md @@ -152,8 +152,8 @@ If generated schema or live capability probing shows that `turn/interrupt` or `turn/steer` is unavailable, the CLI/API control must report that control as unsupported for the active lane instead of failing ordinary issue dispatch. The lane-control contract and support matrix live in [`lane-control.md`](./lane-control.md). -Decodex currently implements `turn/interrupt` through the child-owned app-server -connection for active turns; `turn/steer` remains planned. +Decodex currently implements `turn/interrupt` and `turn/steer` through the child-owned +app-server connection for active turns. ## Required request flow @@ -378,11 +378,10 @@ Method: - `thread/inject_items` -Raw item injection is deferred as an operator feature. Decodex should not expose +Raw item injection is deferred as an operator feature. Decodex does not expose `thread/inject_items` through lane-control CLI/API in this rollout because raw item insertion has broader transcript-shaping semantics than the intended operator steer -contract. Use `turn/steer` for active-lane steering once the CLI/API implementation is -available. +contract. Use `turn/steer` for active-lane steering. ## `command/exec` diff --git a/docs/spec/lane-control.md b/docs/spec/lane-control.md index 1e6bcc7c..87d78cb4 100644 --- a/docs/spec/lane-control.md +++ b/docs/spec/lane-control.md @@ -40,11 +40,11 @@ agent-facing skills must guide responsible use. | Run-control channel foundation | Supported foundation | Active attempts publish a local `.decodex-run-control/*` channel record, runtime SQLite `run_control_channels`, operator status `control_capability`, and private `control_action` audit events | Route lane-control mutations through the active attempt's project, issue, run id, attempt, thread id, current turn id, active lease, and local channel metadata. Invalid or stale requests fail closed and remain local audit evidence. | | Soft interrupt | Supported CLI/API control | `decodex lane interrupt --run-id ` and `POST /api/lane/interrupt` write a run-control request that the active app-server child delivers with `turn/interrupt` | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and records the protocol outcome when app-server returns one. | | Hard interrupt fallback | Explicit fallback only | `decodex lane interrupt --run-id --force` and `POST /api/lane/interrupt` with `"force": true` classify process signaling as `hard_interrupt_fallback` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | -| Steer active lane | Planned CLI/API control; bottom-layer method must stay broad | Decodex does not currently send `turn/steer` from its app-server client | Pass operator-supplied steer text through the CLI/API when available. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | +| Steer active lane | Supported CLI/API control; bottom-layer method stays broad | `decodex lane steer --run-id --expected-turn-id --message `, canonical `POST /api/lane/steer`, legacy alias `POST /api/lane-steer`, local run-control steer request/response files, app-server `turn/steer`, private `control_action` audit events, and protocol activity `turn/steer` summaries | Pass operator-supplied steer text through CLI/API to the current active turn. Require `expectedTurnId`; stale turn ids fail closed. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | | Retained resume/retry | Supported through runtime lifecycle | `decodex run `, retry scheduling, retained worktree recovery, and `thread/resume` for same-thread app-server continuation | Resume only when retained worktree, issue, branch, PR, and runtime evidence still prove the same lane. Treat ambiguous lineage as manual attention. | | Manual attention | Supported terminal control path | `decodex:needs-attention`, `issue_comment(kind = "manual_attention")`, and `issue_terminal_finalize(path = "manual_attention")` | Stop automation when policy requires a human decision. Explain the blocker through structured public fields and keep private evidence local. | | Task replacement | Deferred lifecycle work | No supported active-lane replacement command | Do not use steer or raw injection to replace the task silently. Treat replacement as explicit lifecycle work: pause/stop if needed, update or requeue the issue, or create a new issue/lane. | -| Raw thread item injection | Unsupported as an operator feature | No Decodex operator path for `thread/inject_items` | Do not expose raw `thread/inject_items` to operators in this rollout. Use `turn/steer` for operator steer once implemented. | +| Raw thread item injection | Unsupported as an operator feature | No Decodex operator path for `thread/inject_items` | Do not expose raw `thread/inject_items` to operators in this rollout. Use `turn/steer` for operator steer. | | Active-lane UI authoring controls | Deferred | Existing dashboard views and low-level handlers are not the CLI/API-first lane-control contract | Do not add dashboard steer, retry, or task-replacement controls in this rollout. Ship CLI/API first, then promote UI controls only after audit and policy behavior is settled. | ## Inspect-First Rule @@ -134,6 +134,21 @@ bottom-layer protocol shape must not decide which task topics are acceptable. It carry the operator's steer text broadly, subject only to generic transport and schema requirements. +The supported operator commands are: + +- `decodex lane inspect [--run-id ] [--json]` +- `decodex lane steer --run-id --expected-turn-id --message [--json]` + +The local HTTP API mirrors those semantics: + +- `POST /api/lane/steer` with JSON fields `projectId`, `issue` or `issueId`, + `runId`, `expectedTurnId`, `message`, and optional `waitTimeoutMs` +- `POST /api/lane-steer` remains a compatibility alias for the same request + +The `expectedTurnId` precondition is mandatory. If the current active turn no longer +matches the supplied turn id, the request fails closed with `stale_expected_turn_id` +and remains local audit evidence instead of being delivered to app-server. + Higher layers own guardrails: - CLI/API must require explicit operator-supplied steer text and a target lane identity. @@ -203,13 +218,8 @@ Linear unless a schema-controlled public projection explicitly allows it. ## Implementation Status For This Rollout -This document specifies capabilities that the CLI/API should expose first. Current code -supports lane inspect, CLI project enable/disable, Linear scan requests, soft -interrupt, explicit hard-interrupt fallback, retained resume/retry lifecycle paths, and -manual-attention finalization. Current code does not expose dashboard lane-mutation -controls, does not yet implement Decodex CLI/API controls that send `turn/steer`, and -does not expose raw `thread/inject_items` as an operator feature. - -When implementation work adds the missing CLI/API controls, update this spec, -[`app-server.md`](./app-server.md), the operator reference, and the Decodex plugin -skills in the same lane. +Current code supports lane inspect, CLI project enable/disable, Linear scan requests, +soft interrupt, explicit hard-interrupt fallback, active-lane steer, retained +resume/retry lifecycle paths, and manual-attention finalization. Current code does not +expose dashboard lane-mutation controls and does not expose raw `thread/inject_items` +as an operator feature. diff --git a/plugins/decodex/skills/automation/SKILL.md b/plugins/decodex/skills/automation/SKILL.md index cd8b4fa6..1af79a45 100644 --- a/plugins/decodex/skills/automation/SKILL.md +++ b/plugins/decodex/skills/automation/SKILL.md @@ -130,8 +130,11 @@ Rules for agents: only as `hard_interrupt_fallback` after soft interrupt is unavailable, timed out, or impossible. - Use steer only through the CLI/API lane-control surface and only when the operator - supplies the steer text. Bottom-layer steer support is broad; policy, audit, privacy, - workflow, recovery, and skills provide the guardrails. + supplies the steer text. The CLI form is `decodex lane steer --run-id + --expected-turn-id --message `; API callers use + canonical `POST /api/lane/steer` or legacy alias `POST /api/lane-steer`. + Bottom-layer steer support is broad; policy, audit, privacy, workflow, recovery, + and skills provide the guardrails. - Treat task replacement as explicit lifecycle work, not steer. If the operator wants a different objective or acceptance contract, pause or stop if needed, update/requeue the issue or create a new lane, and preserve audit evidence. diff --git a/plugins/decodex/skills/manual-cli/SKILL.md b/plugins/decodex/skills/manual-cli/SKILL.md index dbd4675a..1570e7a3 100644 --- a/plugins/decodex/skills/manual-cli/SKILL.md +++ b/plugins/decodex/skills/manual-cli/SKILL.md @@ -74,8 +74,12 @@ CLI/API lane controls: - Use `decodex lane interrupt --run-id ` for soft active-turn interruption. Add `--force` only when explicit operator intent allows hard process-kill fallback after soft interrupt is unavailable or fails. -- Steer remains future CLI/API work; use it only through the lane-control surface once - implemented and only with explicit operator-supplied text. +- Use `decodex lane steer --run-id --expected-turn-id + --message ` only with explicit operator-supplied text after inspection proves + the active lane identity. The expected turn id is a fail-closed precondition. + Broad steer text is allowed at the bottom layer; lifecycle ambiguity should route to + explicit recovery or manual attention instead of silently changing the issue + contract. - Do not use active-lane UI controls, direct runtime DB edits, raw `thread/inject_items`, or tracker-state mutations as substitutes for the lane-control contract.