From 09ec75be7d12392718cdb5f775c35fe3069e6a6b Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 19 May 2026 21:38:15 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Use zero for unlimited workflow concurrency","authority":"manual"} --- .../decodex/src/orchestrator/operator_http.rs | 51 ++++++++++ .../tests/operator/status/http.rs | 97 +++++++++++++++++++ apps/decodex/src/workflow.rs | 62 ++++++++---- docs/runbook/self-dogfood-pilot.md | 2 +- docs/spec/runtime.md | 2 +- docs/spec/workflow-file.md | 6 +- 6 files changed, 194 insertions(+), 26 deletions(-) diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 1a992232..a55098bc 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -310,6 +310,8 @@ fn handle_operator_dashboard_websocket_connection( write_dashboard_websocket_event(&mut stream, "snapshot", &payload)?; } + write_current_dashboard_run_activity_event(&mut stream, state_store, &session.subscription); + loop { for frame in read_dashboard_websocket_client_frames(&mut stream, &mut client_frame_buffer)? { match frame { @@ -323,6 +325,13 @@ fn handle_operator_dashboard_websocket_connection( { write_dashboard_websocket_event(&mut stream, "snapshot", &payload)?; } + if dashboard_control_ack_should_push_run_activity(&response) { + write_current_dashboard_run_activity_event( + &mut stream, + state_store, + &session.subscription, + ); + } }, DashboardClientFrame::Close => return Ok(()), DashboardClientFrame::Ping(payload) => { @@ -479,6 +488,48 @@ fn dashboard_control_ack_should_push_snapshot(ack: &Value) -> bool { ) } +fn dashboard_control_ack_should_push_run_activity(ack: &Value) -> bool { + ack.get("accepted").and_then(Value::as_bool).unwrap_or(false) + && matches!( + ack.get("action").and_then(Value::as_str), + Some("subscribe" | "focus" | "clearFocus" | "selectAccount" | "clearAccountSelection") + ) +} + +fn write_current_dashboard_run_activity_event( + stream: &mut TcpStream, + state_store: &StateStore, + subscription: &DashboardClientSubscription, +) { + match build_operator_run_activity_event(state_store).and_then(|event| { + if let Some(event) = dashboard_event_for_subscription(&event.event, subscription) { + if !dashboard_run_activity_event_has_active_runs(&event) { + return Ok(()); + } + + write_dashboard_websocket_event(stream, event.event_type, &event.payload)?; + } + + Ok(()) + }) { + Ok(()) => {}, + Err(error) => { + tracing::warn!( + ?error, + "Skipped immediate dashboard run activity snapshot for a WebSocket client." + ); + }, + } +} + +fn dashboard_run_activity_event_has_active_runs(event: &DashboardBroadcastEvent) -> bool { + event + .payload + .get("activeRuns") + .and_then(Value::as_array) + .is_some_and(|runs| !runs.is_empty()) +} + fn write_dashboard_websocket_event( stream: &mut TcpStream, event_type: &'static str, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/http.rs b/apps/decodex/src/orchestrator/tests/operator/status/http.rs index 54072821..6b348a3c 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/http.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/http.rs @@ -326,6 +326,103 @@ fn operator_dashboard_websocket_sends_current_snapshot_on_connect() { server.join().expect("server thread should complete"); } +#[test] +fn operator_dashboard_websocket_sends_current_run_activity_on_connect() { + let (_temp_dir, config, _workflow) = temp_project_layout(); + let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind"); + let address = listener.local_addr().expect("listener address should resolve"); + let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot { + snapshot_json: Some( + br#"{"project_id":"pubfi","active_runs":[],"account_control":{"mode":"balanced","account_selector":null}}"#.to_vec(), + ), + last_publish_unix_epoch: Some(1_774_000_000), + })); + let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); + let registration = ProjectRegistration::from_config( + config.service_id(), + &service_config_path(config.repo_root()), + &config, + true, + "test-fingerprint", + ); + let issue = sample_issue("Todo", &[]); + let worktree_path = config.worktree_root().join("PUB-101"); + let account = CodexAccountActivitySummary { + account_fingerprint: String::from("acct-1"), + status: String::from("available"), + refresh_status: String::from("ok"), + ..Default::default() + }; + let dashboard_events = DashboardEventHub::default(); + let server_snapshot = Arc::clone(&snapshot); + let server_state_store = Arc::clone(&state_store); + let server_dashboard_events = dashboard_events.clone(); + + state_store.upsert_project(®istration).expect("project should register"); + state_store + .record_run_attempt("run-1", &issue.id, 1, "running") + .expect("run attempt should record"); + state_store + .upsert_lease(config.service_id(), &issue.id, "run-1", "In Progress") + .expect("lease should record"); + state_store + .upsert_worktree( + config.service_id(), + &issue.id, + "x/pubfi-pub-101", + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + + state::write_run_account_marker( + &worktree_path, + &CodexAccountMarker { + run_id: "run-1", + attempt_number: 1, + account: &account, + accounts: slice::from_ref(&account), + }, + ) + .expect("account marker should write"); + + let server = thread::spawn(move || { + let (stream, _) = listener.accept().expect("listener should accept a connection"); + + orchestrator::handle_operator_state_endpoint_connection( + stream, + &server_snapshot, + &server_dashboard_events, + &server_state_store, + ) + .expect("websocket handler should complete after client disconnect"); + }); + let (mut client, response, mut frame) = open_dashboard_websocket_client(address); + + assert!(response.starts_with("HTTP/1.1 101 Switching Protocols\r\n")); + + let _initial_snapshot = read_websocket_json_until(&mut client, &mut frame, |payload| { + payload["type"] == "snapshot" + }); + let activity = read_websocket_json_until(&mut client, &mut frame, |payload| { + payload["type"] == "runActivity" + }); + + assert_eq!(activity["payload"]["activeRuns"][0]["run_id"], "run-1"); + assert_eq!( + activity["payload"]["activeRuns"][0]["account"]["account_fingerprint"], + "acct-1" + ); + assert_eq!( + activity["payload"]["activeRuns"][0]["accounts"][0]["account_fingerprint"], + "acct-1" + ); + + drop(client); + + dashboard_events.close_clients_for_test(); + server.join().expect("server thread should complete"); +} + #[test] fn operator_dashboard_websocket_accepts_subscription_and_project_pause_control() { let (_temp_dir, config, _workflow) = temp_project_layout(); diff --git a/apps/decodex/src/workflow.rs b/apps/decodex/src/workflow.rs index 259d02a3..bd10deaf 100644 --- a/apps/decodex/src/workflow.rs +++ b/apps/decodex/src/workflow.rs @@ -519,18 +519,21 @@ impl<'de> Visitor<'de> for WorkflowConcurrencyLimitVisitor { type Value = WorkflowConcurrencyLimit; fn expecting(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - formatter.write_str("a positive integer or \"unlimited\"") + formatter.write_str("an integer greater than or equal to zero") } fn visit_i64(self, value: i64) -> std::result::Result where E: Error, { - if value <= 0 { + if value < 0 { return Err(E::custom( - "`execution.max_concurrent_agents` must be greater than zero or \"unlimited\".", + "`execution.max_concurrent_agents` must be greater than or equal to zero.", )); } + if value == 0 { + return Ok(WorkflowConcurrencyLimit::Unlimited); + } u32::try_from(value) .map(WorkflowConcurrencyLimit::Limited) @@ -542,9 +545,7 @@ impl<'de> Visitor<'de> for WorkflowConcurrencyLimitVisitor { E: Error, { if value == 0 { - return Err(E::custom( - "`execution.max_concurrent_agents` must be greater than zero or \"unlimited\".", - )); + return Ok(WorkflowConcurrencyLimit::Unlimited); } u32::try_from(value) @@ -552,16 +553,12 @@ impl<'de> Visitor<'de> for WorkflowConcurrencyLimitVisitor { .map_err(|_error| E::custom("`execution.max_concurrent_agents` exceeds `u32::MAX`.")) } - fn visit_str(self, value: &str) -> std::result::Result + fn visit_str(self, _value: &str) -> std::result::Result where E: Error, { - if value == "unlimited" { - return Ok(WorkflowConcurrencyLimit::Unlimited); - } - Err(E::custom( - "`execution.max_concurrent_agents` must be a positive integer or \"unlimited\".", + "`execution.max_concurrent_agents` must be an integer greater than or equal to zero.", )) } } @@ -594,7 +591,7 @@ impl WorkflowConcurrencyLimit { fn validate(self) -> crate::prelude::Result<()> { if matches!(self, Self::Limited(0)) { eyre::bail!( - "`execution.max_concurrent_agents` must be greater than zero or \"unlimited\"." + "`execution.max_concurrent_agents` finite limits must be greater than zero." ); } @@ -617,7 +614,7 @@ impl Serialize for WorkflowConcurrencyLimit { S: Serializer, { match self { - Self::Unlimited => serializer.serialize_str("unlimited"), + Self::Unlimited => serializer.serialize_u32(0), Self::Limited(limit) => serializer.serialize_u32(*limit), } } @@ -1747,21 +1744,40 @@ Then validate the lane. } #[test] - fn parses_unlimited_global_concurrency_limit() { + fn parses_zero_global_concurrency_limit_as_unlimited() { let document = parse_valid_workflow_with(|markdown| { - *markdown = markdown - .replace("max_concurrent_agents = 1", "max_concurrent_agents = \"unlimited\""); + *markdown = markdown.replace("max_concurrent_agents = 1", "max_concurrent_agents = 0"); }) - .expect("unlimited global concurrency should parse"); + .expect("zero global concurrency should parse"); assert_eq!( document.frontmatter().execution().max_concurrent_agents(), WorkflowConcurrencyLimit::Unlimited ); + assert!( + document + .to_markdown() + .expect("workflow markdown should render") + .contains("max_concurrent_agents = 0") + ); + } + + #[test] + fn rejects_string_global_concurrency_limit() { + let result = parse_valid_workflow_with(|markdown| { + *markdown = markdown + .replace("max_concurrent_agents = 1", "max_concurrent_agents = \"unlimited\""); + }); + let error = result.expect_err("string global concurrency should be invalid"); + + assert!( + error.to_string().contains("must be an integer greater than or equal to zero"), + "unexpected error: {error:?}" + ); } #[test] - fn rejects_zero_global_concurrency_limit() { + fn rejects_negative_global_concurrency_limit() { let result = WorkflowDocument::parse_markdown( r#" +++ @@ -1785,7 +1801,7 @@ transport = "stdio://" max_attempts = 3 max_turns = 1 max_retry_backoff_ms = 300000 -max_concurrent_agents = 0 +max_concurrent_agents = -1 gate_profiles = {} canonicalize_commands = [] verify_commands = [] @@ -1800,8 +1816,12 @@ read_first = [] +++ "#, ); + let error = result.expect_err("negative global concurrency should be invalid"); - assert!(result.is_err(), "zero global concurrency should be invalid"); + assert!( + error.to_string().contains("must be greater than or equal to zero"), + "unexpected error: {error:?}" + ); } fn parse_valid_workflow_with(rewrite: impl FnOnce(&mut String)) -> Result { diff --git a/docs/runbook/self-dogfood-pilot.md b/docs/runbook/self-dogfood-pilot.md index 04ca3fd0..ce5385fb 100644 --- a/docs/runbook/self-dogfood-pilot.md +++ b/docs/runbook/self-dogfood-pilot.md @@ -142,7 +142,7 @@ At minimum, the target repo should define: - `[execution] max_attempts` - `[execution] max_turns` - `[execution] max_retry_backoff_ms` -- `[execution] max_concurrent_agents`; set it explicitly to `"unlimited"` to run without a project-level concurrent-agent cap +- `[execution] max_concurrent_agents`; set it explicitly to `0` to run without a project-level concurrent-agent cap - optional `[context] read_first = [...]` only when the repo truly needs extra repo-local files loaded in addition to the `WORKFLOW.md` body; treat this as a Decodex-local extension, not as the primary policy surface Child-run execution policy is not part of the project-owned `WORKFLOW.md` contract. `decodex` must let `codex app-server` inherit sandbox and approval behavior from the active Codex runtime instead of pinning repo-local overrides. diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index abc88f59..8fc7c467 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -109,7 +109,7 @@ Optional future expansion: Current runtime note: -- Project-level concurrency must be explicit; set `[execution] max_concurrent_agents = "unlimited"` for no project-level cap, or use a positive integer for a finite cap. +- Project-level concurrency must be explicit; set `[execution] max_concurrent_agents = 0` for no project-level cap, or use a positive integer for a finite cap. - Active leases are the service-local claim set for running lanes, and shared dispatch-slot locks coordinate cross-process capacity when a finite cap is configured. ## Lane model diff --git a/docs/spec/workflow-file.md b/docs/spec/workflow-file.md index a507359d..df9e4413 100644 --- a/docs/spec/workflow-file.md +++ b/docs/spec/workflow-file.md @@ -138,9 +138,9 @@ Supported keys: - required - note: caps control-plane-owned failure retry backoff in milliseconds; clean continuation retries use a separate short fixed delay in runtime policy - `max_concurrent_agents` - - type: integer or string `"unlimited"` + - type: integer - required - - note: set to `"unlimited"` to run without a project-level concurrent-agent cap; when set to a positive integer, upper-bounds concurrent `decodex` runs per repository; Decodex does not apply separate per-state concurrency caps + - note: set to `0` to run without a project-level concurrent-agent cap; when set to a positive integer, upper-bounds concurrent `decodex` runs per repository; negative values are invalid; Decodex does not apply separate per-state concurrency caps - `canonicalize_commands` - type: array of string - required @@ -310,7 +310,7 @@ transport = "stdio://" max_attempts = 3 max_turns = 1 max_retry_backoff_ms = 300000 -max_concurrent_agents = "unlimited" +max_concurrent_agents = 0 canonicalize_commands = [ "cargo make fmt", "cargo make lint",