diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index 0be81a87..0e23b740 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -8,6 +8,7 @@ use std::{ env, error::Error, fmt::{self, Display, Formatter}, + fs, path::{Path, PathBuf}, time::{Duration, Instant}, }; @@ -49,7 +50,9 @@ use crate::{ prelude::eyre, state::{ self, CodexAccountActivitySummary, CodexAccountMarker, EffectiveRuntimeMarker, - RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, StateStore, + RUN_CONTROL_CHANNEL_DIR, RUN_CONTROL_CHANNEL_STATUS_COMPLETED, + RUN_CONTROL_CHANNEL_STATUS_FAILED, RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, + RUN_OPERATION_AGENT_RUN, RUN_OPERATION_APP_SERVER_PREFLIGHT, RunControlChannel, StateStore, }, }; @@ -1032,19 +1035,42 @@ pub(crate) fn execute_app_server_run( write_activity_marker_best_effort(marker_path, &request.run_id, request.attempt_number); } + let control_channel = publish_run_control_channel_for_request(request, state_store)?; let result = execute_app_server_run_inner(request, state_store); - if result.is_err() { - state_store.record_run_attempt( - &request.run_id, - &request.issue_id, - request.attempt_number, - "failed", - )?; + match &result { + Ok(_result) => + if control_channel.is_some() { + state_store.retire_run_control_channel_for_attempt( + &request.run_id, + request.attempt_number, + RUN_CONTROL_CHANNEL_STATUS_COMPLETED, + )?; + }, + Err(_error) => { + state_store.record_run_attempt( + &request.run_id, + &request.issue_id, + request.attempt_number, + "failed", + )?; - if let Some(marker_path) = request.activity_marker_path.as_ref() { - write_activity_marker_best_effort(marker_path, &request.run_id, request.attempt_number); - } + if control_channel.is_some() { + state_store.retire_run_control_channel_for_attempt( + &request.run_id, + request.attempt_number, + RUN_CONTROL_CHANNEL_STATUS_FAILED, + )?; + } + + if let Some(marker_path) = request.activity_marker_path.as_ref() { + write_activity_marker_best_effort( + marker_path, + &request.run_id, + request.attempt_number, + ); + } + }, } result @@ -1927,6 +1953,88 @@ fn duration_seconds_i64(duration: Duration) -> i64 { i64::try_from(duration.as_secs()).unwrap_or(i64::MAX) } +fn publish_run_control_channel_for_request( + request: &AppServerRunRequest<'_>, + state_store: &StateStore, +) -> crate::prelude::Result> { + let Some(marker_path) = request.activity_marker_path.as_ref() else { + return Ok(None); + }; + let channel_path = + run_control_channel_path(marker_path, &request.run_id, request.attempt_number); + + write_run_control_channel_file(&channel_path, request)?; + + let channel = state_store.publish_run_control_channel_for_active_attempt( + &request.run_id, + request.attempt_number, + &channel_path, + RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, + )?; + + if let Some(channel) = channel.as_ref() { + state_store.append_private_execution_event( + channel.project_id(), + channel.issue_id(), + channel.run_id(), + channel.attempt_number(), + "control_channel_published", + serde_json::json!({ + "schema": "decodex.run_control_channel/v1", + "transport": channel.transport(), + "channel_path": channel.channel_path().display().to_string(), + "status": channel.status(), + "published_at": channel.published_at(), + }), + )?; + } + + Ok(channel) +} + +fn run_control_channel_path(marker_path: &Path, run_id: &str, attempt_number: i64) -> PathBuf { + marker_path + .join(RUN_CONTROL_CHANNEL_DIR) + .join(format!("{}-{attempt_number}.channel", sanitize_run_control_path_segment(run_id))) +} + +fn sanitize_run_control_path_segment(value: &str) -> String { + let sanitized = value + .chars() + .map(|character| { + if character.is_ascii_alphanumeric() || character == '-' || character == '_' { + character + } else { + '_' + } + }) + .collect::(); + + if sanitized.is_empty() { String::from("run") } else { sanitized } +} + +fn write_run_control_channel_file( + channel_path: &Path, + request: &AppServerRunRequest<'_>, +) -> crate::prelude::Result<()> { + if let Some(parent) = channel_path.parent() { + fs::create_dir_all(parent)?; + } + + fs::write( + channel_path, + format!( + "schema=decodex.run_control_channel/v1\nrun_id={}\nissue_id={}\nattempt_number={}\ntransport={}\n", + request.run_id, + request.issue_id, + request.attempt_number, + state::RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE, + ), + )?; + + Ok(()) +} + fn write_activity_marker_best_effort(marker_path: &Path, run_id: &str, attempt_number: i64) { if let Err(error) = state::write_run_operation_marker( marker_path, diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 7b6215d9..12d6d6ae 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -4215,14 +4215,7 @@ fn operator_run_status( .filter(|reason| reason != "turn_completed"); } - let account = marker.as_ref().and_then(RunActivityMarker::account).cloned(); - let mut accounts = marker - .as_ref() - .map(|marker| marker.accounts().to_vec()) - .unwrap_or_default(); - - append_primary_account_if_missing(&mut accounts, account.as_ref()); - + let (account, accounts) = operator_run_accounts(marker.as_ref()); let branch_name = run.branch_name().map(str::to_owned); let worktree_path = operator_run_relative_worktree_path(project, &run); let issue_identifier = operator_run_issue_identifier_from_fields( @@ -4231,6 +4224,7 @@ fn operator_run_status( worktree_path.as_deref(), ); let private_evidence = operator_run_private_evidence(project, &run, issue_identifier.as_deref()); + let control_capability = operator_run_control_capability(&run, &app_server_state); let execution_liveness = operator_run_execution_liveness(&status, &timing, &app_server_state, &protocol_summary); @@ -4270,6 +4264,7 @@ fn operator_run_status( last_event_at: protocol_summary.last_event_at, event_count: protocol_summary.event_count, private_evidence, + control_capability, process_id: timing.process_id, process_alive: timing.process_alive, process_liveness_reason: timing.process_liveness_reason, @@ -4290,6 +4285,17 @@ fn operator_run_status( }) } +fn operator_run_accounts( + marker: Option<&RunActivityMarker>, +) -> (Option, Vec) { + let account = marker.and_then(RunActivityMarker::account).cloned(); + let mut accounts = marker.map(|marker| marker.accounts().to_vec()).unwrap_or_default(); + + append_primary_account_if_missing(&mut accounts, account.as_ref()); + + (account, accounts) +} + fn operator_run_relative_worktree_path( project: &ServiceConfig, run: &ProjectRunStatus, @@ -4312,6 +4318,27 @@ fn operator_run_private_evidence( ) } +fn operator_run_control_capability( + run: &ProjectRunStatus, + app_server_state: &OperatorRunAppServerState, +) -> Option { + let channel = run.control_channel()?; + + Some(OperatorRunControlCapability { + project_id: channel.project_id().to_owned(), + issue_id: channel.issue_id().to_owned(), + run_id: channel.run_id().to_owned(), + attempt_number: channel.attempt_number(), + thread_id: app_server_state.thread_id.clone(), + turn_id: app_server_state.turn_id.clone(), + transport: channel.transport().to_owned(), + channel_path: channel.channel_path().display().to_string(), + status: channel.status().to_owned(), + published_at: channel.published_at().to_owned(), + updated_at: channel.updated_at().to_owned(), + }) +} + fn operator_project_display_name(project: &ServiceConfig) -> String { github_repo_slug_from_origin(project.repo_root()) .or_else(|| repo_root_path_display_name(project.repo_root())) @@ -5423,6 +5450,21 @@ fn render_protocol_activity_summary(summary: Option<&ProtocolActivitySummary>) - format!("turn={turn}; waiting={wait}; rate_limit={rate_limit}; recent={recent}") } +fn render_control_capability_summary( + capability: Option<&OperatorRunControlCapability>, +) -> String { + let Some(capability) = capability else { + return String::from("none"); + }; + let thread_id = capability.thread_id.as_deref().unwrap_or("none"); + let turn_id = capability.turn_id.as_deref().unwrap_or("none"); + + format!( + "status={}; transport={}; channel={}; thread_id={thread_id}; turn_id={turn_id}", + capability.status, capability.transport, capability.channel_path + ) +} + fn render_account_summary(summary: Option<&CodexAccountActivitySummary>) -> String { let Some(summary) = summary else { return String::from("none"); @@ -5712,9 +5754,10 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) { let account = render_account_summary(run.account.as_ref()); let accounts = render_accounts_summary(&run.accounts); let private_evidence = render_private_evidence_reference(run); + let control_capability = render_control_capability_summary(run.control_capability.as_ref()); output.push_str(&format!( - "- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n private_evidence: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n process_liveness_reason: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n", + "- run_id: {}\n project_id: {}\n issue_id: {}\n issue_identifier: {}\n title: {}\n attempt: {}\n status: {}\n attempt_status: {}\n phase: {}\n wait_reason: {}\n current_operation: {}\n active_lease: {}\n queue_lease_state: {}\n queue_lease: {}\n execution_liveness: {}\n freshness_at: {}\n freshness_source: {}\n timing: run_idle={} protocol_idle={} last_progress={} protocol_event={} events={}\n account: {}\n accounts: {}\n child_agent_activity: {}\n protocol_activity: {}\n context_pressure: {}\n private_evidence: {}\n control_capability: {}\n thread_id: {}\n turn_id: {}\n thread_status: {}\n thread_active_flags: {}\n interactive_requested: {}\n continuation_pending: {}\n branch: {}\n worktree_path: {}\n updated_at: {}\n last_run_activity_at: {}\n last_protocol_activity_at: {}\n last_progress_at: {}\n idle_for_seconds: {}\n protocol_idle_for_seconds: {}\n suspected_stall: {}\n process_id: {}\n process_alive: {}\n process_liveness_reason: {}\n retry_kind: {}\n next_retry_at: {}\n effective_model: {}\n effective_model_provider: {}\n effective_cwd: {}\n effective_approval_policy: {}\n effective_approvals_reviewer: {}\n effective_sandbox_mode: {}\n protocol_event: {}\n event_count: {}\n", run.run_id, run.project_id, run.issue_id, @@ -5743,6 +5786,7 @@ fn append_rendered_run(output: &mut String, run: &OperatorRunStatus) { protocol_activity, context_pressure, private_evidence, + control_capability, thread_id, turn_id, thread_status, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index a030945d..969e1c7f 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -234,9 +234,10 @@ fn operator_status_snapshot_updates_owned_merged_worktree_hygiene_without_global #[test] fn live_operator_status_snapshot_hydrates_active_run_issue_display_metadata() { - let (_temp_dir, config, workflow) = temp_project_layout(); + let (temp_dir, config, workflow) = temp_project_layout(); let state_store = StateStore::open_in_memory().expect("state store should open"); let run_id = "xy-392-attempt-1-1777551056"; + let channel_path = temp_dir.path().join("control.channel"); let mut issue = sample_issue_with_sort_fields( "issue-active", "XY-392", @@ -261,6 +262,14 @@ fn live_operator_status_snapshot_hydrates_active_run_issue_display_metadata() { state_store .upsert_lease(config.service_id(), &issue.id, run_id, "In Progress") .expect("active lease should record"); + state_store.update_run_thread(run_id, "thread-1").expect("thread should record"); + state_store.update_run_turn(run_id, "turn-1").expect("turn should record"); + + std::fs::write(&channel_path, "ready\n").expect("control channel should write"); + + state_store + .publish_run_control_channel_for_active_attempt(run_id, 1, &channel_path, "local_file") + .expect("control channel should publish"); let snapshot = orchestrator::build_live_operator_status_snapshot( &tracker, @@ -301,6 +310,15 @@ fn live_operator_status_snapshot_hydrates_active_run_issue_display_metadata() { snapshot_json["active_runs"][0]["private_evidence"]["read_command"], format!("decodex evidence XY-392 --run-id {run_id} --attempt 1 --json") ); + assert_eq!(snapshot_json["active_runs"][0]["control_capability"]["status"], "active"); + assert_eq!( + snapshot_json["active_runs"][0]["control_capability"]["thread_id"], + "thread-1" + ); + assert_eq!( + snapshot_json["active_runs"][0]["control_capability"]["turn_id"], + "turn-1" + ); } #[test] diff --git a/apps/decodex/src/orchestrator/tests/operator/status/text.rs b/apps/decodex/src/orchestrator/tests/operator/status/text.rs index 1506661e..4d766872 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/text.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/text.rs @@ -59,6 +59,9 @@ fn operator_status_text_renders_human_readable_sections() { assert!(rendered.contains( "context_pressure: input=current_window 105.0k, peak_window 105.0k (same as current), cumulative_input 4.27M; output_tokens=12.0k; largest_output=175.8KiB by view_image; warnings=view_image repeated 3 large outputs; largest 180000 bytes" )); + assert!(rendered.contains( + "control_capability: status=active; transport=local_file; channel=.worktrees/PUB-101/.decodex-run-control/run-1-1.channel; thread_id=thread-1; turn_id=turn-1" + )); assert!(rendered.contains("turn_id: turn-1")); assert!(rendered.contains("thread_status: active")); assert!(rendered.contains("thread_active_flags: waitingOnApproval")); diff --git a/apps/decodex/src/orchestrator/tests/operator/status_support.rs b/apps/decodex/src/orchestrator/tests/operator/status_support.rs index f5613859..78500f49 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status_support.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status_support.rs @@ -3,6 +3,7 @@ use std::{panic, slice}; use orchestrator::{OperatorPostReviewLaneStatus, OperatorQueuedIssueStatus, OperatorWorktreeStatus}; use serde_json::Value; use orchestrator::AgentPrivateEvidenceRef; +use orchestrator::OperatorRunControlCapability; fn successful_linear_execution_history_comments(issue: &TrackerIssue) -> Vec { vec![ @@ -202,6 +203,22 @@ fn operator_status_text_backup_codex_account() -> state::CodexAccountActivitySum } } +fn operator_status_text_control_capability() -> OperatorRunControlCapability { + OperatorRunControlCapability { + project_id: String::from("pubfi"), + issue_id: String::from("issue-1"), + run_id: String::from("run-1"), + attempt_number: 1, + thread_id: Some(String::from("thread-1")), + turn_id: Some(String::from("turn-1")), + transport: String::from("local_file"), + channel_path: String::from(".worktrees/PUB-101/.decodex-run-control/run-1-1.channel"), + status: String::from("active"), + published_at: String::from("2026-03-14 10:00:00"), + updated_at: String::from("2026-03-14 10:00:01"), + } +} + fn operator_status_text_active_run() -> orchestrator::OperatorRunStatus { let account = operator_status_text_codex_account(); let backup_account = operator_status_text_backup_codex_account(); @@ -247,6 +264,7 @@ fn operator_status_text_active_run() -> orchestrator::OperatorRunStatus { "decodex evidence PUB-101 --run-id run-1 --attempt 1 --json", ), }, + control_capability: Some(operator_status_text_control_capability()), process_id: Some(1_234), process_alive: Some(true), process_liveness_reason: Some(String::from("process_alive")), diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 471fb4a4..0109c099 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -871,6 +871,7 @@ struct OperatorRunStatus { last_event_at: Option, event_count: i64, private_evidence: AgentPrivateEvidenceRef, + control_capability: Option, process_id: Option, process_alive: Option, process_liveness_reason: Option, @@ -890,6 +891,21 @@ struct OperatorRunStatus { worktree_path: Option, } +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +struct OperatorRunControlCapability { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + thread_id: Option, + turn_id: Option, + transport: String, + channel_path: String, + status: String, + published_at: String, + updated_at: String, +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize)] struct OperatorQueuedIssueStatus { issue_id: String, diff --git a/apps/decodex/src/state.rs b/apps/decodex/src/state.rs index df6ea26a..eea8eef4 100644 --- a/apps/decodex/src/state.rs +++ b/apps/decodex/src/state.rs @@ -41,6 +41,23 @@ pub(crate) const RUN_OPERATION_REPO_GATE: &str = "repo_gate"; pub(crate) const RUN_OPERATION_REVIEW_WRITEBACK: &str = "review_writeback"; pub(crate) const RUN_OPERATION_WAITING_EXTERNAL: &str = "waiting_external"; pub(crate) const RUN_OPERATION_RECONCILIATION: &str = "reconciliation"; +pub(crate) const RUN_CONTROL_CHANNEL_DIR: &str = ".decodex-run-control"; +pub(crate) const RUN_CONTROL_CHANNEL_TRANSPORT_LOCAL_FILE: &str = "local_file"; +pub(crate) const RUN_CONTROL_CHANNEL_STATUS_ACTIVE: &str = "active"; +pub(crate) const RUN_CONTROL_CHANNEL_STATUS_COMPLETED: &str = "completed"; +pub(crate) const RUN_CONTROL_CHANNEL_STATUS_FAILED: &str = "failed"; +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) const RUN_CONTROL_ACTION_ACCEPTED: &str = "accepted"; +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) const RUN_CONTROL_ACTION_REJECTED: &str = "rejected"; +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) const RUN_CONTROL_ACTION_COMPLETED: &str = "completed"; +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) const RUN_CONTROL_ACTION_FAILED: &str = "failed"; +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) const RUN_CONTROL_ACTION_TIMED_OUT: &str = "timed_out"; +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) const RUN_CONTROL_ACTION_FALLBACK: &str = "fallback"; const DISPATCH_SLOT_LOCK_FILE_PREFIX: &str = ".decodex-dispatch-slot"; const ISSUE_CLAIM_LOCK_FILE_PREFIX: &str = ".decodex-issue-claim"; diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 095ba54e..ad6d1d5e 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -121,6 +121,7 @@ struct StateData { projects: HashMap, leases: HashMap, run_attempts: HashMap, + control_channels: HashMap, events: HashMap>, event_summaries: HashMap, worktrees: HashMap, @@ -138,6 +139,7 @@ impl StateData { self.projects = loaded.projects; self.leases = loaded.leases; self.run_attempts = loaded.run_attempts; + self.control_channels = loaded.control_channels; self.events = loaded.events; self.event_summaries = loaded.event_summaries; self.worktrees = loaded.worktrees; @@ -151,6 +153,7 @@ impl StateData { fn replace_project_run_state(&mut self, loaded: Self) { self.leases = loaded.leases; self.run_attempts = loaded.run_attempts; + self.control_channels = loaded.control_channels; self.event_summaries = loaded.event_summaries; self.worktrees = loaded.worktrees; } @@ -180,6 +183,15 @@ impl StateData { } let event_summary = self.protocol_event_summary(&attempt.run_id); + let control_channel = self + .control_channels + .get(&attempt.run_id) + .filter(|channel| { + channel.project_id == project_id + && channel.issue_id == attempt.issue_id + && channel.attempt_number == attempt.attempt_number + }) + .map(RunControlChannelRecord::as_public); Some(ProjectRunStatus { run_id: attempt.run_id.clone(), @@ -197,6 +209,7 @@ impl StateData { last_event_type: event_summary.last_event_type, last_event_at: event_summary.last_event_at, last_event_at_unix: event_summary.last_event_at_unix, + control_channel, }) } @@ -365,6 +378,7 @@ CREATE TABLE IF NOT EXISTS review_orchestrations ( ); "#, )?; + self.bootstrap_run_control_channels_schema()?; self.bootstrap_connector_backoffs_schema()?; self.bootstrap_private_execution_events_schema()?; self.record_schema_version()?; @@ -372,6 +386,30 @@ CREATE TABLE IF NOT EXISTS review_orchestrations ( Ok(()) } + fn bootstrap_run_control_channels_schema(&self) -> Result<()> { + self.connection.execute_batch( + r#" +CREATE TABLE IF NOT EXISTS run_control_channels ( + run_id TEXT PRIMARY KEY NOT NULL, + project_id TEXT NOT NULL, + issue_id TEXT NOT NULL, + attempt_number INTEGER NOT NULL, + transport TEXT NOT NULL, + channel_path TEXT NOT NULL, + status TEXT NOT NULL, + published_at TEXT NOT NULL, + published_at_unix INTEGER NOT NULL, + updated_at TEXT NOT NULL, + updated_at_unix INTEGER NOT NULL +); +CREATE INDEX IF NOT EXISTS run_control_channels_project_issue_idx +ON run_control_channels (project_id, issue_id, attempt_number); +"#, + )?; + + Ok(()) + } + fn bootstrap_connector_backoffs_schema(&self) -> Result<()> { self.connection.execute_batch( r#" @@ -425,7 +463,7 @@ CREATE TABLE IF NOT EXISTS schema_meta ( value TEXT NOT NULL ); INSERT INTO schema_meta (key, value) -VALUES ('schema_version', '6') +VALUES ('schema_version', '7') ON CONFLICT(key) DO UPDATE SET value = excluded.value; "#, )?; @@ -439,6 +477,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; self.load_projects(&mut state)?; self.load_leases(&mut state)?; self.load_run_attempts(&mut state)?; + self.load_run_control_channels(&mut state)?; self.load_protocol_event_summaries(&mut state)?; self.load_worktrees(&mut state)?; self.load_linear_execution_events(&mut state)?; @@ -456,6 +495,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; self.load_leases(&mut state)?; self.load_run_attempts(&mut state)?; self.load_worktrees(&mut state)?; + self.load_run_control_channels_for_project(&mut state, project_id)?; self.load_protocol_event_summaries_for_project_runs(&mut state, project_id)?; Ok(state) @@ -475,6 +515,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; persist_projects(&transaction, state)?; persist_leases(&transaction, state)?; persist_run_attempts(&transaction, state)?; + persist_run_control_channels(&transaction, state)?; persist_protocol_events(&transaction, state)?; persist_worktrees(&transaction, state)?; persist_linear_execution_events(&transaction, state)?; @@ -496,6 +537,10 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; "DELETE FROM connector_backoffs WHERE project_id = ?1", params![service_id], )?; + transaction.execute( + "DELETE FROM run_control_channels WHERE project_id = ?1", + params![service_id], + )?; transaction.commit()?; Ok(()) @@ -532,6 +577,30 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn upsert_run_control_channel(&self, channel: &RunControlChannelRecord) -> Result<()> { + self.connection.execute( + "INSERT OR REPLACE INTO run_control_channels ( + run_id, project_id, issue_id, attempt_number, transport, channel_path, status, + published_at, published_at_unix, updated_at, updated_at_unix + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + params![ + &channel.run_id, + &channel.project_id, + &channel.issue_id, + channel.attempt_number, + &channel.transport, + channel.channel_path.to_string_lossy().as_ref(), + &channel.status, + &channel.published_at, + channel.published_at_unix, + &channel.updated_at, + channel.updated_at_unix, + ], + )?; + + Ok(()) + } + fn upsert_lease_and_remember_run_project(&mut self, lease: &IssueLease) -> Result<()> { let transaction = self.connection.transaction()?; @@ -680,6 +749,10 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; "UPDATE run_attempts SET issue_id = ?2 WHERE issue_id = ?1", params![previous_issue_id, canonical_issue_id], )?; + transaction.execute( + "UPDATE run_control_channels SET issue_id = ?2 WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; transaction.execute( "UPDATE private_execution_events SET issue_id = ?2 WHERE issue_id = ?1", params![previous_issue_id, canonical_issue_id], @@ -870,6 +943,72 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn load_run_control_channels(&self, state: &mut StateData) -> Result<()> { + let mut statement = self.connection.prepare( + "SELECT run_id, project_id, issue_id, attempt_number, transport, channel_path, status, \ + published_at, published_at_unix, updated_at, updated_at_unix \ + FROM run_control_channels", + )?; + let rows = statement.query_map([], |row| { + Ok(RunControlChannelRecord { + run_id: row.get(0)?, + project_id: row.get(1)?, + issue_id: row.get(2)?, + attempt_number: row.get(3)?, + transport: row.get(4)?, + channel_path: PathBuf::from(row.get::<_, String>(5)?), + status: row.get(6)?, + published_at: row.get(7)?, + published_at_unix: row.get(8)?, + updated_at: row.get(9)?, + updated_at_unix: row.get(10)?, + }) + })?; + + for row in rows { + let channel = row?; + + state.control_channels.insert(channel.run_id.clone(), channel); + } + + Ok(()) + } + + fn load_run_control_channels_for_project( + &self, + state: &mut StateData, + project_id: &str, + ) -> Result<()> { + let mut statement = self.connection.prepare( + "SELECT run_id, project_id, issue_id, attempt_number, transport, channel_path, status, \ + published_at, published_at_unix, updated_at, updated_at_unix \ + FROM run_control_channels WHERE project_id = ?1", + )?; + let rows = statement.query_map(params![project_id], |row| { + Ok(RunControlChannelRecord { + run_id: row.get(0)?, + project_id: row.get(1)?, + issue_id: row.get(2)?, + attempt_number: row.get(3)?, + transport: row.get(4)?, + channel_path: PathBuf::from(row.get::<_, String>(5)?), + status: row.get(6)?, + published_at: row.get(7)?, + published_at_unix: row.get(8)?, + updated_at: row.get(9)?, + updated_at_unix: row.get(10)?, + }) + })?; + + for row in rows { + let channel = row?; + + state.control_channels.insert(channel.run_id.clone(), channel); + } + + Ok(()) + } + fn retry_budget_attempt_count(&self, issue_id: &str) -> Result { self.connection .query_row( @@ -1363,6 +1502,38 @@ impl RunAttemptRecord { } } +#[derive(Clone, Debug)] +struct RunControlChannelRecord { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + transport: String, + channel_path: PathBuf, + status: String, + published_at: String, + published_at_unix: i64, + updated_at: String, + updated_at_unix: i64, +} +impl RunControlChannelRecord { + fn as_public(&self) -> RunControlChannel { + RunControlChannel { + project_id: self.project_id.clone(), + issue_id: self.issue_id.clone(), + run_id: self.run_id.clone(), + attempt_number: self.attempt_number, + transport: self.transport.clone(), + channel_path: self.channel_path.clone(), + status: self.status.clone(), + published_at: self.published_at.clone(), + published_at_unix: self.published_at_unix, + updated_at: self.updated_at.clone(), + updated_at_unix: self.updated_at_unix, + } + } +} + #[derive(Clone, Debug)] struct ProtocolEventRecord { sequence_number: i64, @@ -2227,6 +2398,32 @@ fn persist_run_attempts(transaction: &Transaction<'_>, state: &StateData) -> Res Ok(()) } +fn persist_run_control_channels(transaction: &Transaction<'_>, state: &StateData) -> Result<()> { + for channel in state.control_channels.values() { + transaction.execute( + "INSERT OR REPLACE INTO run_control_channels ( + run_id, project_id, issue_id, attempt_number, transport, channel_path, status, + published_at, published_at_unix, updated_at, updated_at_unix + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + params![ + &channel.run_id, + &channel.project_id, + &channel.issue_id, + channel.attempt_number, + &channel.transport, + channel.channel_path.to_string_lossy().as_ref(), + &channel.status, + &channel.published_at, + channel.published_at_unix, + &channel.updated_at, + channel.updated_at_unix, + ], + )?; + } + + Ok(()) +} + fn persist_protocol_events( transaction: &Transaction<'_>, state: &StateData, diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index 8bf7ae1c..f148a110 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -70,6 +70,146 @@ impl RunAttempt { } } +/// Local control capability published by one active run attempt. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RunControlChannel { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + transport: String, + channel_path: PathBuf, + status: String, + published_at: String, + published_at_unix: i64, + updated_at: String, + updated_at_unix: i64, +} +impl RunControlChannel { + /// Local project identifier owning this control channel. + pub fn project_id(&self) -> &str { + &self.project_id + } + + /// Issue identifier owning this control channel. + pub fn issue_id(&self) -> &str { + &self.issue_id + } + + /// Stable run identifier owning this control channel. + pub fn run_id(&self) -> &str { + &self.run_id + } + + /// Attempt number owning this control channel. + pub fn attempt_number(&self) -> i64 { + self.attempt_number + } + + /// Local transport mechanism for this control channel. + pub fn transport(&self) -> &str { + &self.transport + } + + /// Local path used by this control channel. + pub fn channel_path(&self) -> &Path { + &self.channel_path + } + + /// Runtime status for this control channel. + pub fn status(&self) -> &str { + &self.status + } + + /// UTC timestamp when this control channel was first published. + pub fn published_at(&self) -> &str { + &self.published_at + } + + /// UTC timestamp when this control channel was last updated. + pub fn updated_at(&self) -> &str { + &self.updated_at + } +} + +/// Local run-control request resolution and first audit row. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RunControlActionReceipt { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + thread_id: Option, + turn_id: Option, + source: String, + action: String, + outcome: String, + reason: String, + audit_record_id: i64, + channel: Option, +} +impl RunControlActionReceipt { + /// Project identifier used for the local audit scope. + pub fn project_id(&self) -> &str { + &self.project_id + } + + /// Issue identifier used for the local audit scope. + pub fn issue_id(&self) -> &str { + &self.issue_id + } + + /// Run identifier used for the local audit scope. + pub fn run_id(&self) -> &str { + &self.run_id + } + + /// Attempt number used for the local audit scope. + pub fn attempt_number(&self) -> i64 { + self.attempt_number + } + + /// Requested thread identifier, when supplied. + pub fn thread_id(&self) -> Option<&str> { + self.thread_id.as_deref() + } + + /// Requested turn identifier, when supplied. + pub fn turn_id(&self) -> Option<&str> { + self.turn_id.as_deref() + } + + /// Local source that requested the action. + pub fn source(&self) -> &str { + &self.source + } + + /// Requested control action. + pub fn action(&self) -> &str { + &self.action + } + + /// Normalized audit outcome for the request resolution. + pub fn outcome(&self) -> &str { + &self.outcome + } + + /// Normalized reason for the request resolution. + pub fn reason(&self) -> &str { + &self.reason + } + + /// Private execution event row id for the request-resolution audit. + pub fn audit_record_id(&self) -> i64 { + self.audit_record_id + } + + /// Control channel selected for an accepted request. + pub fn channel(&self) -> Option<&RunControlChannel> { + self.channel.as_ref() + } +} + /// One private, local-only execution event retained in the runtime SQLite ledger. #[derive(Clone, Debug, PartialEq)] pub struct PrivateExecutionEvent { @@ -148,6 +288,7 @@ pub struct ProjectRunStatus { last_event_type: Option, last_event_at: Option, last_event_at_unix: Option, + control_channel: Option, } impl ProjectRunStatus { /// Stable run identifier. @@ -215,6 +356,11 @@ impl ProjectRunStatus { self.last_event_at.as_deref() } + /// Local control capability published by this run attempt, when one exists. + pub fn control_channel(&self) -> Option<&RunControlChannel> { + self.control_channel.as_ref() + } + /// Unix timestamp of the latest recorded protocol event, when one exists. pub(crate) fn last_event_at_unix(&self) -> Option { self.last_event_at_unix @@ -328,6 +474,30 @@ pub struct PreacquiredLeaseGuards { pub dispatch_slot_index: usize, } +/// Foundation request for resolving a local run-control action. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) struct RunControlActionRequest<'a> { + /// Requested project identifier. + pub(crate) project_id: &'a str, + /// Requested issue identifier. + pub(crate) issue_id: &'a str, + /// Requested run identifier. + pub(crate) run_id: &'a str, + /// Requested attempt number. + pub(crate) attempt_number: i64, + /// Requested app-server thread identifier, when known. + pub(crate) thread_id: Option<&'a str>, + /// Requested current app-server turn identifier, when known. + pub(crate) turn_id: Option<&'a str>, + /// Local source that requested the action. + pub(crate) source: &'a str, + /// Requested control action. + pub(crate) action: &'a str, + /// Optional caller timeout budget in milliseconds. + pub(crate) timeout_ms: Option, +} + /// Registered repo target managed by the local Decodex control plane. #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct ProjectRegistration { diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 69642440..3d11ee5b 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -11,51 +11,6 @@ pub(crate) struct ConnectorBackoffInput<'a> { pub(crate) warning: &'a str, } -/// Shared dispatch-slot capacity for one project. -#[derive(Clone, Copy)] -pub(crate) enum DispatchSlotLimit { - /// Fixed number of cross-process dispatch slots. - Limited(u32), - /// Allocate dispatch slots on demand without a fixed project cap. - Unlimited, -} -impl DispatchSlotLimit { - fn validate(self) -> Result<()> { - if matches!(self, Self::Limited(0)) { - eyre::bail!("dispatch slot limit must be greater than zero or unlimited"); - } - - Ok(()) - } - - fn includes(self, slot_index: usize) -> Result { - match self { - Self::Unlimited => Ok(true), - Self::Limited(limit) => Ok(slot_index - < usize::try_from(limit) - .map_err(|_error| eyre::eyre!("dispatch slot limit overflowed usize"))?), - } - } -} -impl From for DispatchSlotLimit { - fn from(value: u32) -> Self { - Self::Limited(value) - } -} -impl From> for DispatchSlotLimit { - fn from(value: Option) -> Self { - match value { - Some(limit) => Self::Limited(limit), - None => Self::Unlimited, - } - } -} -impl From for DispatchSlotLimit { - fn from(value: WorkflowConcurrencyLimit) -> Self { - Self::from(value.dispatch_slot_limit()) - } -} - /// Local runtime store for leases, attempts, worktrees, protocol events, and private evidence. #[derive(Default)] pub struct StateStore { @@ -269,6 +224,13 @@ impl StateStore { { attempt.issue_id = canonical_issue_id.to_owned(); } + for channel in state + .control_channels + .values_mut() + .filter(|channel| channel.issue_id == previous_issue_id) + { + channel.issue_id = canonical_issue_id.to_owned(); + } for record in state .private_execution_events .iter_mut() @@ -776,6 +738,204 @@ impl StateStore { self.upsert_run_attempt_locked(&attempt) } + /// Publish the local control channel for an active attempt when the runtime owns it. + pub(crate) fn publish_run_control_channel_for_active_attempt( + &self, + run_id: &str, + attempt_number: i64, + channel_path: &Path, + transport: &str, + ) -> Result> { + validate_run_control_channel_inputs(run_id, attempt_number, channel_path, transport)?; + + let now = timestamp_parts(); + let mut state = self.lock_without_refresh()?; + let Some(attempt) = state.run_attempts.get(run_id).cloned() else { + return Ok(None); + }; + + if attempt.attempt_number != attempt_number { + return Ok(None); + } + + let Some(lease) = state.leases.get(&attempt.issue_id) else { + return Ok(None); + }; + + if lease.run_id != run_id { + return Ok(None); + } + + let (published_at, published_at_unix) = state + .control_channels + .get(run_id) + .filter(|channel| channel.attempt_number == attempt_number) + .map_or_else(|| (now.text.clone(), now.unix), |channel| { + (channel.published_at.clone(), channel.published_at_unix) + }); + let channel = RunControlChannelRecord { + project_id: lease.project_id.clone(), + issue_id: attempt.issue_id.clone(), + run_id: run_id.to_owned(), + attempt_number, + transport: transport.to_owned(), + channel_path: channel_path.to_path_buf(), + status: RUN_CONTROL_CHANNEL_STATUS_ACTIVE.to_owned(), + published_at, + published_at_unix, + updated_at: now.text, + updated_at_unix: now.unix, + }; + + state.control_channels.insert(run_id.to_owned(), channel.clone()); + self.upsert_run_control_channel_locked(&channel)?; + + Ok(Some(channel.as_public())) + } + + /// Mark a run-control channel as no longer active for an attempt. + pub(crate) fn retire_run_control_channel_for_attempt( + &self, + run_id: &str, + attempt_number: i64, + status: &str, + ) -> Result<()> { + validate_run_control_channel_status(status)?; + + let now = timestamp_parts(); + let mut state = self.lock_without_refresh()?; + let Some(channel) = state.control_channels.get_mut(run_id) else { + return Ok(()); + }; + + if channel.attempt_number != attempt_number { + return Ok(()); + } + + channel.status = status.to_owned(); + channel.updated_at = now.text; + channel.updated_at_unix = now.unix; + + let channel = channel.clone(); + + self.upsert_run_control_channel_locked(&channel) + } + + /// Resolve a local run-control request against active runtime ownership and audit it. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn resolve_run_control_action( + &self, + request: RunControlActionRequest<'_>, + ) -> Result { + validate_run_control_action_request(&request)?; + + let resolution = { + let state = self.lock()?; + + resolve_run_control_action_locked(&state, &request) + }; + let event = self.append_run_control_audit_event( + &resolution.audit_target, + &resolution.outcome, + &resolution.reason, + None, + )?; + + Ok(RunControlActionReceipt { + project_id: resolution.audit_target.project_id, + issue_id: resolution.audit_target.issue_id, + run_id: resolution.audit_target.run_id, + attempt_number: resolution.audit_target.attempt_number, + thread_id: resolution.audit_target.thread_id, + turn_id: resolution.audit_target.turn_id, + source: resolution.audit_target.source, + action: resolution.audit_target.action, + outcome: resolution.outcome, + reason: resolution.reason, + audit_record_id: event.record_id(), + channel: resolution.channel, + }) + } + + /// Append a follow-up audit outcome for an already resolved control request. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn record_run_control_action_outcome( + &self, + receipt: &RunControlActionReceipt, + outcome: &str, + reason: &str, + ) -> Result { + validate_run_control_action_outcome(outcome)?; + validate_required_run_control_field("reason", reason)?; + + let target = RunControlAuditTarget { + project_id: receipt.project_id.clone(), + issue_id: receipt.issue_id.clone(), + run_id: receipt.run_id.clone(), + attempt_number: receipt.attempt_number, + thread_id: receipt.thread_id.clone(), + turn_id: receipt.turn_id.clone(), + source: receipt.source.clone(), + action: receipt.action.clone(), + timeout_ms: None, + channel: receipt.channel.clone(), + }; + + self.append_run_control_audit_event( + &target, + outcome, + reason, + Some(receipt.audit_record_id), + ) + } + + #[cfg_attr(not(test), allow(dead_code))] + fn append_run_control_audit_event( + &self, + target: &RunControlAuditTarget, + outcome: &str, + reason: &str, + parent_record_id: Option, + ) -> Result { + validate_run_control_action_outcome(outcome)?; + + let channel = target.channel.as_ref(); + let payload = serde_json::json!({ + "schema": "decodex.run_control_action/v1", + "action": target.action, + "source": target.source, + "outcome": outcome, + "reason": reason, + "parent_record_id": parent_record_id, + "requested": { + "project_id": target.project_id, + "issue_id": target.issue_id, + "run_id": target.run_id, + "attempt_number": target.attempt_number, + "thread_id": target.thread_id, + "turn_id": target.turn_id, + "timeout_ms": target.timeout_ms, + }, + "channel": channel.map(|channel| serde_json::json!({ + "transport": channel.transport(), + "channel_path": channel.channel_path().display().to_string(), + "status": channel.status(), + "published_at": channel.published_at(), + "updated_at": channel.updated_at(), + "path_exists": channel.channel_path().exists(), + })), + }); + + self.append_private_execution_event( + &target.project_id, + &target.issue_id, + &target.run_id, + target.attempt_number, + "control_action", + payload, + ) + } + /// Compute the next attempt number for one issue. pub fn next_attempt_number(&self, issue_id: &str) -> Result { let state = self.lock()?; @@ -1592,6 +1752,17 @@ impl StateStore { sqlite.upsert_run_attempt(attempt) } + fn upsert_run_control_channel_locked(&self, channel: &RunControlChannelRecord) -> Result<()> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(()); + }; + let sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + sqlite.upsert_run_control_channel(channel) + } + fn upsert_lease_and_remember_run_project_locked(&self, lease: &IssueLease) -> Result<()> { let Some(sqlite) = self.sqlite.as_ref() else { return Ok(()); @@ -1759,6 +1930,77 @@ impl StateStore { } } +#[cfg_attr(not(test), allow(dead_code))] +struct RunControlActionResolution { + audit_target: RunControlAuditTarget, + outcome: String, + reason: String, + channel: Option, +} + +#[derive(Clone)] +#[cfg_attr(not(test), allow(dead_code))] +struct RunControlAuditTarget { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + thread_id: Option, + turn_id: Option, + source: String, + action: String, + timeout_ms: Option, + channel: Option, +} + +/// Shared dispatch-slot capacity for one project. +#[derive(Clone, Copy)] +pub(crate) enum DispatchSlotLimit { + /// Fixed number of cross-process dispatch slots. + Limited(u32), + /// Allocate dispatch slots on demand without a fixed project cap. + Unlimited, +} +impl DispatchSlotLimit { + fn validate(self) -> Result<()> { + if matches!(self, Self::Limited(0)) { + eyre::bail!("dispatch slot limit must be greater than zero or unlimited"); + } + + Ok(()) + } + + fn includes(self, slot_index: usize) -> Result { + match self { + Self::Unlimited => Ok(true), + Self::Limited(limit) => Ok(slot_index + < usize::try_from(limit) + .map_err(|_error| eyre::eyre!("dispatch slot limit overflowed usize"))?), + } + } +} + +impl From for DispatchSlotLimit { + fn from(value: u32) -> Self { + Self::Limited(value) + } +} + +impl From> for DispatchSlotLimit { + fn from(value: Option) -> Self { + match value { + Some(limit) => Self::Limited(limit), + None => Self::Unlimited, + } + } +} + +impl From for DispatchSlotLimit { + fn from(value: WorkflowConcurrencyLimit) -> Self { + Self::from(value.dispatch_slot_limit()) + } +} + fn retarget_review_handoff_issue( records: &mut HashMap, previous_issue_id: &str, @@ -1787,6 +2029,204 @@ fn active_run_attempt_status(status: &str) -> bool { matches!(status, "starting" | "running") } +#[cfg_attr(not(test), allow(dead_code))] +fn resolve_run_control_action_locked( + state: &StateData, + request: &RunControlActionRequest<'_>, +) -> RunControlActionResolution { + let Some(attempt) = state.run_attempts.get(request.run_id) else { + return rejected_run_control_resolution(request, None, "run_not_found"); + }; + let audit_project_id = state + .control_channels + .get(request.run_id) + .map(|channel| channel.project_id.clone()) + .or_else(|| state.project_id_for_run(&attempt.issue_id, &attempt.run_id)) + .unwrap_or_else(|| request.project_id.to_owned()); + let audit_target = RunControlAuditTarget { + project_id: audit_project_id, + issue_id: attempt.issue_id.clone(), + run_id: attempt.run_id.clone(), + attempt_number: attempt.attempt_number, + thread_id: request.thread_id.map(str::to_owned), + turn_id: request.turn_id.map(str::to_owned), + source: request.source.to_owned(), + action: request.action.to_owned(), + timeout_ms: request.timeout_ms, + channel: None, + }; + + if attempt.issue_id != request.issue_id { + return rejected_run_control_resolution(request, Some(audit_target), "issue_mismatch"); + } + if attempt.attempt_number != request.attempt_number { + return rejected_run_control_resolution(request, Some(audit_target), "attempt_mismatch"); + } + if request.thread_id.is_some() + && attempt.thread_id.as_deref() != request.thread_id + { + return rejected_run_control_resolution(request, Some(audit_target), "thread_mismatch"); + } + if request.turn_id.is_some() && attempt.turn_id.as_deref() != request.turn_id { + return rejected_run_control_resolution(request, Some(audit_target), "turn_mismatch"); + } + + let Some(lease) = state.leases.get(request.issue_id) else { + return rejected_run_control_resolution(request, Some(audit_target), "active_lease_missing"); + }; + + if lease.project_id != request.project_id { + return rejected_run_control_resolution(request, Some(audit_target), "project_mismatch"); + } + if lease.run_id != request.run_id { + return rejected_run_control_resolution(request, Some(audit_target), "active_run_mismatch"); + } + if !active_run_attempt_status(&attempt.status) { + return rejected_run_control_resolution(request, Some(audit_target), "run_not_active"); + } + + let Some(channel) = state.control_channels.get(request.run_id).cloned() else { + return rejected_run_control_resolution(request, Some(audit_target), "control_channel_missing"); + }; + let channel = channel.as_public(); + let audit_target = RunControlAuditTarget { channel: Some(channel.clone()), ..audit_target }; + + if channel.project_id() != request.project_id + || channel.issue_id() != request.issue_id + || channel.attempt_number() != request.attempt_number + { + return rejected_run_control_resolution( + request, + Some(audit_target), + "control_channel_identity_mismatch", + ); + } + if channel.status() != RUN_CONTROL_CHANNEL_STATUS_ACTIVE { + return rejected_run_control_resolution( + request, + Some(audit_target), + "control_channel_inactive", + ); + } + if !channel.channel_path().exists() { + return rejected_run_control_resolution( + request, + Some(audit_target), + "control_channel_missing", + ); + } + + RunControlActionResolution { + audit_target, + outcome: RUN_CONTROL_ACTION_ACCEPTED.to_owned(), + reason: String::from("active_run_control_channel_resolved"), + channel: Some(channel), + } +} + +#[cfg_attr(not(test), allow(dead_code))] +fn rejected_run_control_resolution( + request: &RunControlActionRequest<'_>, + audit_target: Option, + reason: &str, +) -> RunControlActionResolution { + RunControlActionResolution { + audit_target: audit_target.unwrap_or_else(|| RunControlAuditTarget { + project_id: request.project_id.to_owned(), + issue_id: request.issue_id.to_owned(), + run_id: request.run_id.to_owned(), + attempt_number: request.attempt_number, + thread_id: request.thread_id.map(str::to_owned), + turn_id: request.turn_id.map(str::to_owned), + source: request.source.to_owned(), + action: request.action.to_owned(), + timeout_ms: request.timeout_ms, + channel: None, + }), + outcome: RUN_CONTROL_ACTION_REJECTED.to_owned(), + reason: reason.to_owned(), + channel: None, + } +} + +fn validate_run_control_channel_inputs( + run_id: &str, + attempt_number: i64, + channel_path: &Path, + transport: &str, +) -> Result<()> { + validate_required_run_control_field("run_id", run_id)?; + validate_required_run_control_field("transport", transport)?; + + if attempt_number < 1 { + eyre::bail!("run-control attempt_number must be positive"); + } + if channel_path.as_os_str().is_empty() { + eyre::bail!("run-control channel_path must not be empty"); + } + + Ok(()) +} + +#[cfg_attr(not(test), allow(dead_code))] +fn validate_run_control_action_request(request: &RunControlActionRequest<'_>) -> Result<()> { + validate_required_run_control_field("project_id", request.project_id)?; + validate_required_run_control_field("issue_id", request.issue_id)?; + validate_required_run_control_field("run_id", request.run_id)?; + validate_required_run_control_field("source", request.source)?; + validate_required_run_control_field("action", request.action)?; + + if request.attempt_number < 1 { + eyre::bail!("run-control attempt_number must be positive"); + } + + if let Some(timeout_ms) = request.timeout_ms + && timeout_ms < 0 + { + eyre::bail!("run-control timeout_ms must not be negative"); + } + + Ok(()) +} + +fn validate_required_run_control_field(name: &str, value: &str) -> Result<()> { + if value.trim().is_empty() { + eyre::bail!("run-control {name} must not be empty"); + } + + Ok(()) +} + +fn validate_run_control_channel_status(status: &str) -> Result<()> { + if !matches!( + status, + RUN_CONTROL_CHANNEL_STATUS_ACTIVE + | RUN_CONTROL_CHANNEL_STATUS_COMPLETED + | RUN_CONTROL_CHANNEL_STATUS_FAILED + ) { + eyre::bail!("unsupported run-control channel status `{status}`"); + } + + Ok(()) +} + +#[cfg_attr(not(test), allow(dead_code))] +fn validate_run_control_action_outcome(outcome: &str) -> Result<()> { + if !matches!( + outcome, + RUN_CONTROL_ACTION_ACCEPTED + | RUN_CONTROL_ACTION_REJECTED + | RUN_CONTROL_ACTION_COMPLETED + | RUN_CONTROL_ACTION_FAILED + | RUN_CONTROL_ACTION_TIMED_OUT + | RUN_CONTROL_ACTION_FALLBACK + ) { + eyre::bail!("unsupported run-control action outcome `{outcome}`"); + } + + Ok(()) +} + fn retarget_review_orchestration_issue( records: &mut HashMap, previous_issue_id: &str, diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index f9da3f8f..450b41e7 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -15,8 +15,9 @@ use crate::{ self, ChildAgentActivitySummary, CodexAccountActivitySummary, CodexAccountMarker, ConnectorBackoffInput, DispatchSlotLimit, EffectiveRuntimeMarker, PreacquiredLeaseGuards, ProjectRegistration, ProtocolActivityMarker, ProtocolActivitySummary, - RUN_ACTIVITY_MARKER_FILE, RUN_OPERATION_REPO_GATE, ReviewHandoffMarker, - ReviewOrchestrationMarker, StateStore, + RUN_ACTIVITY_MARKER_FILE, RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, + RUN_CONTROL_ACTION_FALLBACK, RUN_CONTROL_ACTION_TIMED_OUT, RUN_OPERATION_REPO_GATE, + ReviewHandoffMarker, ReviewOrchestrationMarker, RunControlActionRequest, StateStore, }, tracker::records::{LinearExecutionEventIdentity, LinearExecutionEventRecord}, }; @@ -2539,3 +2540,220 @@ fn remove_project_deletes_persistent_registry_row() { "removed project must not remain in SQLite registry" ); } + +#[test] +fn run_control_accepts_active_attempt_and_persists_audit() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let channel_path = temp_dir.path().join("control.channel"); + + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + let store = StateStore::open(&state_path).expect("state store should open"); + + store + .upsert_lease("pubfi", "issue-1", "run-1", IN_PROGRESS_STATE) + .expect("lease should record"); + store.record_run_attempt("run-1", "issue-1", 1, "running").expect("attempt should record"); + store.update_run_thread("run-1", "thread-1").expect("thread should record"); + store.update_run_turn("run-1", "turn-1").expect("turn should record"); + store + .publish_run_control_channel_for_active_attempt("run-1", 1, &channel_path, "local_file") + .expect("control channel should publish") + .expect("active control channel should exist"); + + let receipt = store + .resolve_run_control_action(RunControlActionRequest { + project_id: "pubfi", + issue_id: "issue-1", + run_id: "run-1", + attempt_number: 1, + thread_id: Some("thread-1"), + turn_id: Some("turn-1"), + source: "test_hook", + action: "noop", + timeout_ms: Some(500), + }) + .expect("control request should resolve"); + + assert_eq!(receipt.outcome(), "accepted"); + assert_eq!(receipt.reason(), "active_run_control_channel_resolved"); + assert!(receipt.channel().is_some()); + + for (outcome, reason) in [ + (RUN_CONTROL_ACTION_COMPLETED, "noop_completed"), + (RUN_CONTROL_ACTION_FAILED, "noop_failed"), + (RUN_CONTROL_ACTION_TIMED_OUT, "noop_timed_out"), + (RUN_CONTROL_ACTION_FALLBACK, "noop_fallback"), + ] { + store + .record_run_control_action_outcome(&receipt, outcome, reason) + .expect("follow-up control audit should record"); + } + + drop(store); + + let reopened = StateStore::open(&state_path).expect("state store should reopen"); + let events = reopened + .list_private_execution_events("pubfi", "issue-1", "run-1", 1) + .expect("private control audit should read"); + let outcomes = events + .iter() + .filter(|event| event.event_type() == "control_action") + .filter_map(|event| event.payload().get("outcome").and_then(|value| value.as_str())) + .collect::>(); + + assert_eq!(outcomes, vec!["accepted", "completed", "failed", "timed_out", "fallback"]); +} + +#[test] +fn run_control_rejects_stale_turn_and_run_mismatch() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let channel_path = temp_dir.path().join("control.channel"); + + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + let store = StateStore::open_in_memory().expect("state store should open"); + + store + .upsert_lease("pubfi", "issue-1", "run-current", IN_PROGRESS_STATE) + .expect("lease should record"); + store + .record_run_attempt("run-current", "issue-1", 1, "running") + .expect("attempt should record"); + store.update_run_thread("run-current", "thread-1").expect("thread should record"); + store.update_run_turn("run-current", "turn-current").expect("turn should record"); + store + .publish_run_control_channel_for_active_attempt( + "run-current", + 1, + &channel_path, + "local_file", + ) + .expect("control channel should publish"); + + let stale_turn = store + .resolve_run_control_action(RunControlActionRequest { + project_id: "pubfi", + issue_id: "issue-1", + run_id: "run-current", + attempt_number: 1, + thread_id: Some("thread-1"), + turn_id: Some("turn-old"), + source: "test_hook", + action: "noop", + timeout_ms: None, + }) + .expect("stale turn should be audited"); + let stale_run = store + .resolve_run_control_action(RunControlActionRequest { + project_id: "pubfi", + issue_id: "issue-1", + run_id: "run-old", + attempt_number: 1, + thread_id: Some("thread-1"), + turn_id: Some("turn-current"), + source: "test_hook", + action: "noop", + timeout_ms: None, + }) + .expect("stale run should be audited"); + + assert_eq!(stale_turn.outcome(), "rejected"); + assert_eq!(stale_turn.reason(), "turn_mismatch"); + assert_eq!(stale_run.outcome(), "rejected"); + assert_eq!(stale_run.reason(), "run_not_found"); +} + +#[test] +fn run_control_rejects_missing_channel_file() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let channel_path = temp_dir.path().join("control.channel"); + + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + let store = StateStore::open_in_memory().expect("state store should open"); + + store + .upsert_lease("pubfi", "issue-1", "run-1", IN_PROGRESS_STATE) + .expect("lease should record"); + store.record_run_attempt("run-1", "issue-1", 1, "running").expect("attempt should record"); + store + .publish_run_control_channel_for_active_attempt("run-1", 1, &channel_path, "local_file") + .expect("control channel should publish"); + + fs::remove_file(&channel_path).expect("control channel should be removable"); + + let receipt = store + .resolve_run_control_action(RunControlActionRequest { + project_id: "pubfi", + issue_id: "issue-1", + run_id: "run-1", + attempt_number: 1, + thread_id: None, + turn_id: None, + source: "test_hook", + action: "noop", + timeout_ms: None, + }) + .expect("missing channel should be audited"); + + assert_eq!(receipt.outcome(), "rejected"); + assert_eq!(receipt.reason(), "control_channel_missing"); +} + +#[test] +fn run_control_requires_active_run_ownership() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let channel_path = temp_dir.path().join("control.channel"); + + fs::write(&channel_path, "ready\n").expect("control channel should write"); + + let store = StateStore::open_in_memory().expect("state store should open"); + + store + .upsert_lease("pubfi", "issue-1", "run-1", IN_PROGRESS_STATE) + .expect("lease should record"); + store.record_run_attempt("run-1", "issue-1", 1, "running").expect("attempt should record"); + store + .publish_run_control_channel_for_active_attempt("run-1", 1, &channel_path, "local_file") + .expect("control channel should publish"); + store.clear_lease("issue-1").expect("lease should clear"); + + let no_lease = store + .resolve_run_control_action(RunControlActionRequest { + project_id: "pubfi", + issue_id: "issue-1", + run_id: "run-1", + attempt_number: 1, + thread_id: None, + turn_id: None, + source: "test_hook", + action: "noop", + timeout_ms: None, + }) + .expect("missing lease should be audited"); + + store + .upsert_lease("pubfi", "issue-1", "run-other", IN_PROGRESS_STATE) + .expect("other lease should record"); + + let wrong_run = store + .resolve_run_control_action(RunControlActionRequest { + project_id: "pubfi", + issue_id: "issue-1", + run_id: "run-1", + attempt_number: 1, + thread_id: None, + turn_id: None, + source: "test_hook", + action: "noop", + timeout_ms: None, + }) + .expect("wrong active run should be audited"); + + assert_eq!(no_lease.outcome(), "rejected"); + assert_eq!(no_lease.reason(), "active_lease_missing"); + assert_eq!(wrong_run.outcome(), "rejected"); + assert_eq!(wrong_run.reason(), "active_run_mismatch"); +} diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index f681e4f7..21acf51f 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -105,12 +105,13 @@ work that needs full private payload values. | Surface | Owns | Does Not Own | | --- | --- | --- | -| Runtime SQLite DB | active leases, attempts, protocol events, private execution events, worktree mappings, retry state, retained PR state, phase timing, connector backoff, project registry | human backlog grooming or durable team-visible issue history | +| Runtime SQLite DB | active leases, attempts, run-control channels, protocol events, private execution events, worktree mappings, retry state, retained PR state, phase timing, connector backoff, project registry | human backlog grooming or durable team-visible issue history | | Central project config | `service_id`, repo root, worktree root, tracker/GitHub credential env-var names, enabled project registration | per-run state or issue ownership | | Project `WORKFLOW.md` | repo policy, validation gate, state names, retry/review policy | runtime ownership, queue labels, credentials, model overrides | | Linear | team-visible issue state, queue/active/manual-attention labels, coarse execution ledger comments, progress/failure/handoff/closeout summaries | high-frequency runtime truth, heartbeat, token pressure, raw attempts, private execution evidence, connector retry budgets | | GitHub | PR, checks, review comments, merge evidence, signed commit verification | queue selection or local lane ownership | | `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt, including same-boot and same-process-start liveness | durable ownership, review handoff identity, cleanup authority | +| `.decodex-run-control/` | local per-attempt control-channel marker files for active runtime-owned attempts | standalone ownership proof, public tracker history, or dashboard-authored lane mutation | ## Operator Dashboard Sections @@ -154,7 +155,8 @@ the dev listener owns scheduling. For the lane-control rollout, active-lane UI posture is observe-only. The dashboard renders active-lane state, protocol activity, liveness, private-evidence references, -and local acknowledgement/account controls, but it is not the supported place to author +local run-control capability metadata, and local acknowledgement/account controls, but +it is not the supported place to author steer, retry, task replacement, or lifecycle mutations. CLI/API is the first operator-control surface for lane control, governed by [`../spec/lane-control.md`](../spec/lane-control.md). The browser UI does not show or @@ -165,6 +167,10 @@ because it changes the global Codex account selector, not an active lane. marks whether a payload is the complete active-run list; subscription-filtered payloads set it to `false`, so consumers must not treat a missing run in that payload as ended. +Active run rows may include `control_capability` with the active attempt's project, +issue, run id, attempt, current thread/turn ids, local transport, channel path, status, +and timestamps. It is local routing metadata for future CLI/API controls, not a +dashboard command surface. Snapshot `warnings` remain stable machine-readable tokens. When a warning needs operator action, snapshots may also include `warning_details` entries with the affected `project_id`, `repo_root`, reason, and next action; for example, a stale diff --git a/docs/spec/lane-control.md b/docs/spec/lane-control.md index 5a86561e..495ced51 100644 --- a/docs/spec/lane-control.md +++ b/docs/spec/lane-control.md @@ -37,6 +37,7 @@ agent-facing skills must guide responsible use. | Project dispatch pause | Supported for future dispatch | `decodex project disable ` and the runtime project enabled flag | Pause prevents new dispatch for the project. It must not kill or rewrite already active lanes. | | Project dispatch resume | Supported for future dispatch | `decodex project enable ` and the runtime project enabled flag | Resume re-enables future dispatch after the operator has inspected blockers, capacity, and queue state. | | Linear scan request | Supported | `POST /api/linear-scan` with optional `projectId` | Queue a scan for the next control-plane tick while respecting tracker backoff. This is an intake/status refresh request, not an execution command. | +| Run-control channel foundation | Supported foundation | Active attempts publish a local `.decodex-run-control/*` channel record, runtime SQLite `run_control_channels`, operator status `control_capability`, and private `control_action` audit events | Route any future mutation through the active attempt's project, issue, run id, attempt, thread id, current turn id, active lease, and local channel metadata. Invalid or stale requests fail closed and remain local audit evidence. | | Soft interrupt | Planned CLI/API control; bottom-layer method allowed | Decodex does not currently send `turn/interrupt` from its app-server client | Prefer soft interrupt before hard interruption when the active turn id is known and the app-server capability is present. Soft interrupt requests a graceful turn stop and must leave classification to the runtime. | | Hard interrupt fallback | Emergency fallback only | No dashboard or CLI/API lane-control path exposes hard interrupt in this rollout; runtime recovery can still classify attempts as `interrupted` | Use only when soft interrupt is unavailable, timed out, or impossible because the process or app-server boundary cannot be reached. Preserve retained worktree evidence and runtime classification. | | Steer active lane | Planned CLI/API control; bottom-layer method must stay broad | Decodex does not currently send `turn/steer` from its app-server client | Pass operator-supplied steer text through the CLI/API when available. Do not narrow the protocol to a fixed set of task-content categories. Apply policy, audit, privacy, and lifecycle guardrails above the protocol. | @@ -62,6 +63,34 @@ If inspection cannot prove the requested lane identity, do not steer, interrupt, or resume. Use the manual-attention path or a read-only recovery diagnosis instead of guessing. +## Run-Control Channel Foundation + +Every live app-server attempt publishes a per-attempt local control capability when +Decodex still owns the active lease for the run. The current mechanism is a local file +channel under the run worktree's `.decodex-run-control/` directory plus a +`run_control_channels` runtime SQLite row. This is foundation plumbing only: it proves +where an active attempt can receive future control traffic without exposing steer, +interrupt, or task-replacement semantics by itself. + +The channel row is scoped by project id, issue id, run id, attempt number, transport, +channel path, channel status, and publish/update timestamps. The current thread id and +turn id remain on the run attempt row and are projected together with the channel as +operator `control_capability` metadata. `decodex status`, JSON operator snapshots, and +private evidence readback may show this local capability, but Linear must not receive +host-local channel paths or raw control payloads. + +A control request is valid only when all of the following hold: + +- the requested run exists +- requested project id, issue id, run id, and attempt number match the active attempt +- requested thread id and turn id, when supplied, match the current attempt values +- the active lease for the issue is held by the same project and run id +- the attempt status is active +- the persisted channel row is active and the local channel path still exists + +Any mismatch fails closed. Rejections are not converted into process signals, tracker +state changes, or worktree mutations. + ## Soft And Hard Interrupts Soft interrupt is the preferred active-turn stop path. A compliant soft interrupt: @@ -143,6 +172,12 @@ Every supported control mutation should create local runtime evidence. At minimu control audit record should identify the project, issue, run id, attempt, branch, operator command source, requested capability, normalized result, and next action. +The run-control foundation records local `control_action` private execution events for +accepted, rejected, completed, failed, timed out, and fallback outcomes. These records +are scoped by the same project, issue, run id, and attempt tuple as other private +execution evidence. They are available through `decodex evidence --run-id + --attempt ` and survive independently of any public Linear projection. + Linear public text remains sparse. Do not write steer text, raw command output, process diagnostics, private evidence payloads, account details, or host-local paths into Linear unless a schema-controlled public projection explicitly allows it. diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 75f7c7a1..32430a78 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -31,14 +31,15 @@ Defines: The runtime scope, source-of-truth boundaries, eligibility rules, lane ## Source of truth boundaries -- The Decodex runtime SQLite database is the single-machine source of truth for active leases, attempts, protocol events, private execution events, worktree mappings, retained PR state, retry state, phase timing, project registration, tracker cache, PR cache, and connector backoff. +- The Decodex runtime SQLite database is the single-machine source of truth for active leases, attempts, run-control channels, protocol events, private execution events, worktree mappings, retained PR state, retry state, phase timing, project registration, tracker cache, PR cache, and connector backoff. - Linear remains the team-visible tracker surface for issue lifecycle, queue/active/manual-attention labels, and coarse lifecycle summaries such as start, PR-ready, blocked, failed, landed, and done. - Versioned Linear execution event comments use the schema in [`linear-execution-ledger.md`](./linear-execution-ledger.md), but fine-grained runtime truth must not be rebuilt from comments every tick. - Private execution events are structured runtime evidence rows scoped by `project_id`, `issue_id`, `run_id`, and `attempt_number`. They hold full local - evidence that should be queryable through `StateStore` without being mirrored to - Linear execution ledger payloads. The operator CLI readback path is + evidence, including run-control audit records, that should be queryable through + `StateStore` without being mirrored to Linear execution ledger payloads. The + operator CLI readback path is `decodex evidence --run-id --attempt `, which reads the local runtime store and summarizes payloads by default. - Centralized project directories under `~/.codex/decodex/projects//` @@ -53,6 +54,7 @@ mirror: | Surface | Boundary | | --- | --- | | Runtime SQLite `private_execution_events` | Structured private execution evidence for the local Decodex installation. This is where full checkpoint payloads, verification notes, local head evidence, and recovery detail belong. | +| Runtime SQLite `run_control_channels` | Local control capability metadata for active run attempts. It records the project, issue, run id, attempt, transport, local channel path, channel status, and publish/update timestamps needed to route future control requests without bypassing active lease ownership. | | Agent evidence under `~/.codex/decodex/agent-evidence//` | Derived local handoff view for repair agents. It may reference private evidence readback commands and compact run capsules, but it is not scheduling authority and is not a public mirror. | | Logs under `~/.codex/decodex/logs/` and `.decodex-run-activity` | Diagnostic process and liveness signals. They may explain what a local process did, but they are not the structured execution ledger and must not be replayed as tracker state. | | Linear execution ledger comments | Low-frequency public projection for team-visible lifecycle state. They carry coarse start, progress phase, PR, handoff, failure, landing, closeout, and cleanup summaries only. | @@ -64,6 +66,7 @@ Operator snapshots are local runtime views. They must remain useful when Linear The following facts are local runtime truth and must not be rebuilt from Linear comments on every tick: - lane attempts: `run_id`, `attempt_number`, attempt status, and terminal classification +- active run-control channel metadata and local control audit events - protocol events, event counts, event timestamps, and thread/liveness hydration fields - private execution events carrying structured local evidence for an issue/run/attempt - retry and backoff state: queued retry kind, due time, retry budget, and connector backoff