From 02af9ee57873318bafdfe8eeb9ce1a4793d3d59f Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 19 May 2026 15:31:22 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Support unlimited concurrency limits","authority":"manual"} --- apps/decodex/src/orchestrator/daemon.rs | 8 +- .../tests/intake/candidate_selection.rs | 5 +- apps/decodex/src/orchestrator/types.rs | 2 +- apps/decodex/src/state/internal.rs | 2 +- apps/decodex/src/state/store.rs | 69 +++++- apps/decodex/src/state/tests.rs | 36 +++- apps/decodex/src/workflow.rs | 199 +++++++++++++++--- docs/runbook/self-dogfood-pilot.md | 2 +- docs/spec/runtime.md | 4 +- docs/spec/workflow-file.md | 9 +- 10 files changed, 293 insertions(+), 43 deletions(-) diff --git a/apps/decodex/src/orchestrator/daemon.rs b/apps/decodex/src/orchestrator/daemon.rs index cb0d5803..b86ba6dc 100644 --- a/apps/decodex/src/orchestrator/daemon.rs +++ b/apps/decodex/src/orchestrator/daemon.rs @@ -133,8 +133,12 @@ where context.review_state_inspector, )?; - while active_children.len() - < context.workflow.frontmatter().execution().max_concurrent_agents() as usize + while context + .workflow + .frontmatter() + .execution() + .max_concurrent_agents() + .has_capacity(active_children.len()) { if !spawn_next_daemon_child( config_path, diff --git a/apps/decodex/src/orchestrator/tests/intake/candidate_selection.rs b/apps/decodex/src/orchestrator/tests/intake/candidate_selection.rs index 2ec270f2..9d30208f 100644 --- a/apps/decodex/src/orchestrator/tests/intake/candidate_selection.rs +++ b/apps/decodex/src/orchestrator/tests/intake/candidate_selection.rs @@ -427,7 +427,10 @@ fn candidate_selection_allows_multi_slot_dispatch_when_configured() { ) .expect("workflow should parse"); - assert_eq!(workflow.frontmatter().execution().max_concurrent_agents(), 2); + assert_eq!( + workflow.frontmatter().execution().max_concurrent_agents().dispatch_slot_limit(), + Some(2) + ); let state_store = StateStore::open_in_memory().expect("state store should open"); diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 8eee5019..e6d906bc 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -1056,7 +1056,7 @@ impl ConcurrencySnapshot { } fn has_global_capacity(&self, execution: &WorkflowExecution) -> bool { - self.total_active < execution.max_concurrent_agents() as usize + execution.max_concurrent_agents().has_capacity(self.total_active) } } diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 136eca08..79e63f16 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -34,7 +34,7 @@ pub(crate) struct CodexAccountMarker<'a> { #[derive(Clone)] struct DispatchSlotConfig { root: PathBuf, - slot_limit: usize, + slot_limit: DispatchSlotLimit, } struct IssueClaimGuard { diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index db895577..37aa0540 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -1,5 +1,52 @@ use std::mem; +use crate::workflow::WorkflowConcurrencyLimit; + +/// Shared dispatch-slot capacity for one project. +#[derive(Clone, Copy)] +pub(crate) enum DispatchSlotLimit { + /// Fixed number of cross-process dispatch slots. + Limited(u32), + /// Allocate dispatch slots on demand without a fixed project cap. + Unlimited, +} +impl DispatchSlotLimit { + fn validate(self) -> Result<()> { + if matches!(self, Self::Limited(0)) { + eyre::bail!("dispatch slot limit must be greater than zero or unlimited"); + } + + Ok(()) + } + + fn includes(self, slot_index: usize) -> Result { + match self { + Self::Unlimited => Ok(true), + Self::Limited(limit) => Ok(slot_index + < usize::try_from(limit) + .map_err(|_error| eyre::eyre!("dispatch slot limit overflowed usize"))?), + } + } +} +impl From for DispatchSlotLimit { + fn from(value: u32) -> Self { + Self::Limited(value) + } +} +impl From> for DispatchSlotLimit { + fn from(value: Option) -> Self { + match value { + Some(limit) => Self::Limited(limit), + None => Self::Unlimited, + } + } +} +impl From for DispatchSlotLimit { + fn from(value: WorkflowConcurrencyLimit) -> Self { + Self::from(value.dispatch_slot_limit()) + } +} + /// Local runtime store for leases, attempts, worktrees, and protocol events. #[derive(Default)] pub struct StateStore { @@ -55,20 +102,21 @@ impl StateStore { } /// Configure the shared cross-process dispatch-slot root for one project. - pub fn configure_dispatch_slot_root( + pub(crate) fn configure_dispatch_slot_root( &self, project_id: &str, worktree_root: impl AsRef, - slot_limit: u32, + slot_limit: impl Into, ) -> Result<()> { + let slot_limit = slot_limit.into(); let mut state = self.lock()?; + slot_limit.validate()?; state.dispatch_slot_configs.insert( project_id.to_owned(), DispatchSlotConfig { root: worktree_root.as_ref().to_path_buf(), - slot_limit: usize::try_from(slot_limit) - .map_err(|_error| eyre::eyre!("dispatch slot limit overflowed usize"))?, + slot_limit, }, ); @@ -201,9 +249,14 @@ impl StateStore { .map(|guard| guard.slot_index) .collect::>(); let mut acquired_guard = None; + let mut slot_index = 0; - for slot_index in 0..dispatch_slot_config.slot_limit { + while dispatch_slot_config.slot_limit.includes(slot_index)? { if held_slot_indexes.contains(&slot_index) { + slot_index = slot_index + .checked_add(1) + .ok_or_else(|| eyre::eyre!("dispatch slot index overflowed usize"))?; + continue; } @@ -225,9 +278,13 @@ impl StateStore { break; }, - Err(TryLockError::WouldBlock) => continue, + Err(TryLockError::WouldBlock) => {}, Err(TryLockError::Error(error)) => return Err(error.into()), } + + slot_index = slot_index + .checked_add(1) + .ok_or_else(|| eyre::eyre!("dispatch slot index overflowed usize"))?; } let Some(dispatch_slot_guard) = acquired_guard else { diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index bd37a408..b7024552 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -13,7 +13,7 @@ use tempfile::TempDir; use crate::{ state::{ self, ChildAgentActivitySummary, CodexAccountActivitySummary, CodexAccountMarker, - EffectiveRuntimeMarker, PreacquiredLeaseGuards, ProjectRegistration, + DispatchSlotLimit, EffectiveRuntimeMarker, PreacquiredLeaseGuards, ProjectRegistration, ProtocolActivityMarker, ProtocolActivitySummary, RUN_ACTIVITY_MARKER_FILE, RUN_OPERATION_REPO_GATE, ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore, }, @@ -516,6 +516,40 @@ fn shared_dispatch_slots_honor_configured_limit_across_process_local_stores() { ); } +#[test] +fn shared_dispatch_slots_support_unlimited_across_process_local_stores() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let store_one = StateStore::open_in_memory().expect("first store should open"); + let store_two = StateStore::open_in_memory().expect("second store should open"); + let store_three = StateStore::open_in_memory().expect("third store should open"); + + store_one + .configure_dispatch_slot_root("pubfi", temp_dir.path(), DispatchSlotLimit::Unlimited) + .expect("first store should configure unlimited dispatch slots"); + store_two + .configure_dispatch_slot_root("pubfi", temp_dir.path(), DispatchSlotLimit::Unlimited) + .expect("second store should configure unlimited dispatch slots"); + store_three + .configure_dispatch_slot_root("pubfi", temp_dir.path(), DispatchSlotLimit::Unlimited) + .expect("third store should configure unlimited dispatch slots"); + + assert!( + store_one + .try_acquire_lease("pubfi", "PUB-101", "run-1", IN_PROGRESS_STATE) + .expect("first shared lease acquisition should succeed") + ); + assert!( + store_two + .try_acquire_lease("pubfi", "PUB-102", "run-2", IN_PROGRESS_STATE) + .expect("second store should acquire another shared slot") + ); + assert!( + store_three + .try_acquire_lease("pubfi", "PUB-103", "run-3", IN_PROGRESS_STATE) + .expect("third store should acquire another shared slot") + ); +} + #[test] fn failed_shared_slot_attempt_releases_issue_claim_before_retry() { let temp_dir = TempDir::new().expect("tempdir should create"); diff --git a/apps/decodex/src/workflow.rs b/apps/decodex/src/workflow.rs index 007df9fc..bc13685c 100644 --- a/apps/decodex/src/workflow.rs +++ b/apps/decodex/src/workflow.rs @@ -2,14 +2,18 @@ use std::{ collections::{BTreeSet, HashMap}, + fmt::{self, Formatter}, fs, path::{Component, Path}, }; use globset::{Glob, GlobSet, GlobSetBuilder}; -use serde::{Deserialize, Serialize}; +use serde::{ + Deserialize, Deserializer, Serialize, Serializer, + de::{Error, Visitor}, +}; -use crate::prelude::{Result, eyre}; +use crate::prelude::eyre; const FRONTMATTER_DELIMITER: &str = "+++"; @@ -21,7 +25,7 @@ pub struct WorkflowDocument { } impl WorkflowDocument { /// Parse a workflow document from Markdown text. - pub fn parse_markdown(input: &str) -> Result { + pub fn parse_markdown(input: &str) -> crate::prelude::Result { let (frontmatter_input, body) = split_frontmatter(input)?; let frontmatter = toml::from_str::(&frontmatter_input)?; @@ -31,7 +35,7 @@ impl WorkflowDocument { } /// Load a workflow document from the repository root. - pub fn from_path(path: impl AsRef) -> Result { + pub fn from_path(path: impl AsRef) -> crate::prelude::Result { let input = fs::read_to_string(path)?; Self::parse_markdown(&input) @@ -48,7 +52,7 @@ impl WorkflowDocument { } /// Render the workflow back to Markdown for process-to-process handoff. - pub fn to_markdown(&self) -> Result { + pub fn to_markdown(&self) -> crate::prelude::Result { let frontmatter = toml::to_string(&self.frontmatter)?; let mut markdown = format!("{FRONTMATTER_DELIMITER}\n{frontmatter}{FRONTMATTER_DELIMITER}"); @@ -97,7 +101,7 @@ impl WorkflowFrontmatter { &self.context } - fn validate(&self) -> Result<()> { + fn validate(&self) -> crate::prelude::Result<()> { if self.version != 1 { eyre::bail!("Unsupported WORKFLOW.md version: {}", self.version); } @@ -224,7 +228,8 @@ pub struct WorkflowExecution { max_attempts: u32, max_turns: u32, max_retry_backoff_ms: u64, - max_concurrent_agents: u32, + #[serde(default)] + max_concurrent_agents: WorkflowConcurrencyLimit, canonicalize_commands: Vec, verify_commands: Vec, gate_profiles: HashMap, @@ -267,7 +272,7 @@ impl WorkflowExecution { } /// Maximum concurrent agents allowed for this repository. - pub fn max_concurrent_agents(&self) -> u32 { + pub fn max_concurrent_agents(&self) -> WorkflowConcurrencyLimit { self.max_concurrent_agents } @@ -310,10 +315,8 @@ impl WorkflowExecution { self.default_repo_gate() } - fn validate(&self) -> Result<()> { - if self.max_concurrent_agents == 0 { - eyre::bail!("`execution.max_concurrent_agents` must be greater than zero."); - } + fn validate(&self) -> crate::prelude::Result<()> { + self.max_concurrent_agents.validate()?; validate_string_entries("execution.canonicalize_commands", &self.canonicalize_commands)?; validate_string_entries("execution.verify_commands", &self.verify_commands)?; @@ -363,7 +366,7 @@ impl WorkflowWorkspaceHooks { self.timeout_seconds } - fn validate(&self) -> Result<()> { + fn validate(&self) -> crate::prelude::Result<()> { if self.timeout_seconds == 0 { eyre::bail!("`execution.workspace_hooks.timeout_seconds` must be greater than zero."); } @@ -411,7 +414,7 @@ impl WorkflowGateProfile { &self.verify_commands } - fn validate(&self, profile_name: &str) -> Result<()> { + fn validate(&self, profile_name: &str) -> crate::prelude::Result<()> { if self.paths.is_empty() { eyre::bail!("`execution.gate_profiles.{profile_name}.paths` must not be empty."); } @@ -440,7 +443,10 @@ impl WorkflowGateProfile { Ok(()) } - fn matches_changed_files(&self, changed_files: &BTreeSet) -> Result { + fn matches_changed_files( + &self, + changed_files: &BTreeSet, + ) -> crate::prelude::Result { let path_set = self.compile_path_set("runtime")?; match self.match_mode { @@ -449,7 +455,7 @@ impl WorkflowGateProfile { } } - fn compile_path_set(&self, profile_name: &str) -> Result { + fn compile_path_set(&self, profile_name: &str) -> crate::prelude::Result { let mut builder = GlobSetBuilder::new(); for path in &self.paths { @@ -504,11 +510,121 @@ impl WorkflowContext { &self.read_first } - fn validate(&self) -> Result<()> { + fn validate(&self) -> crate::prelude::Result<()> { validate_repo_relative_paths("context.read_first", &self.read_first) } } +struct WorkflowConcurrencyLimitVisitor; +impl<'de> Visitor<'de> for WorkflowConcurrencyLimitVisitor { + type Value = WorkflowConcurrencyLimit; + + fn expecting(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + formatter.write_str("a positive integer or \"unlimited\"") + } + + fn visit_i64(self, value: i64) -> std::result::Result + where + E: Error, + { + if value <= 0 { + return Err(E::custom( + "`execution.max_concurrent_agents` must be greater than zero or \"unlimited\".", + )); + } + + u32::try_from(value) + .map(WorkflowConcurrencyLimit::Limited) + .map_err(|_error| E::custom("`execution.max_concurrent_agents` exceeds `u32::MAX`.")) + } + + fn visit_u64(self, value: u64) -> std::result::Result + where + E: Error, + { + if value == 0 { + return Err(E::custom( + "`execution.max_concurrent_agents` must be greater than zero or \"unlimited\".", + )); + } + + u32::try_from(value) + .map(WorkflowConcurrencyLimit::Limited) + .map_err(|_error| E::custom("`execution.max_concurrent_agents` exceeds `u32::MAX`.")) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: Error, + { + if value == "unlimited" { + return Ok(WorkflowConcurrencyLimit::Unlimited); + } + + Err(E::custom( + "`execution.max_concurrent_agents` must be a positive integer or \"unlimited\".", + )) + } +} + +/// Project-level concurrent agent limit. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum WorkflowConcurrencyLimit { + /// No project-level concurrent-agent cap. + #[default] + Unlimited, + /// A positive project-level concurrent-agent cap. + Limited(u32), +} +impl WorkflowConcurrencyLimit { + /// Return whether `active_count` still leaves dispatch capacity. + pub fn has_capacity(self, active_count: usize) -> bool { + match self { + Self::Unlimited => true, + Self::Limited(limit) => active_count < limit as usize, + } + } + + /// Return the finite dispatch-slot limit, or `None` when unlimited. + pub fn dispatch_slot_limit(self) -> Option { + match self { + Self::Unlimited => None, + Self::Limited(limit) => Some(limit), + } + } + + fn validate(self) -> crate::prelude::Result<()> { + if matches!(self, Self::Limited(0)) { + eyre::bail!( + "`execution.max_concurrent_agents` must be greater than zero or \"unlimited\"." + ); + } + + Ok(()) + } +} + +impl<'de> Deserialize<'de> for WorkflowConcurrencyLimit { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(WorkflowConcurrencyLimitVisitor) + } +} + +impl Serialize for WorkflowConcurrencyLimit { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + match self { + Self::Unlimited => serializer.serialize_str("unlimited"), + Self::Limited(limit) => serializer.serialize_u32(*limit), + } + } +} + /// Supported tracker providers. #[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] @@ -525,7 +641,7 @@ pub enum WorkflowGateMatchMode { Only, } -fn validate_string_entries(field_name: &str, values: &[String]) -> Result<()> { +fn validate_string_entries(field_name: &str, values: &[String]) -> crate::prelude::Result<()> { for value in values { let trimmed = value.trim(); @@ -540,7 +656,7 @@ fn validate_string_entries(field_name: &str, values: &[String]) -> Result<()> { Ok(()) } -fn validate_repo_relative_paths(field_name: &str, values: &[String]) -> Result<()> { +fn validate_repo_relative_paths(field_name: &str, values: &[String]) -> crate::prelude::Result<()> { validate_string_entries(field_name, values)?; for value in values { @@ -559,7 +675,7 @@ fn validate_repo_relative_paths(field_name: &str, values: &[String]) -> Result<( Ok(()) } -fn split_frontmatter(input: &str) -> Result<(String, String)> { +fn split_frontmatter(input: &str) -> crate::prelude::Result<(String, String)> { let input = input.trim_start_matches(['\u{feff}', '\n', '\r']); let mut lines = input.lines(); @@ -593,7 +709,7 @@ fn split_frontmatter(input: &str) -> Result<(String, String)> { Ok((frontmatter_lines.join("\n"), body)) } -fn validate_trimmed_non_empty(field_name: &str, value: &str) -> Result<()> { +fn validate_trimmed_non_empty(field_name: &str, value: &str) -> crate::prelude::Result<()> { if value.trim().is_empty() { eyre::bail!("`{field_name}` must not be empty."); } @@ -604,7 +720,10 @@ fn validate_trimmed_non_empty(field_name: &str, value: &str) -> Result<()> { Ok(()) } -fn validate_non_empty_string_list(field_name: &str, values: &[String]) -> Result<()> { +fn validate_non_empty_string_list( + field_name: &str, + values: &[String], +) -> crate::prelude::Result<()> { if values.is_empty() { eyre::bail!("`{field_name}` must not be empty."); } @@ -625,7 +744,9 @@ mod tests { use crate::{ prelude::Result, - workflow::{TrackerProvider, WorkflowDocument, WorkflowGateMatchMode}, + workflow::{ + TrackerProvider, WorkflowConcurrencyLimit, WorkflowDocument, WorkflowGateMatchMode, + }, }; enum Edit<'a> { @@ -694,7 +815,10 @@ Use `cargo make`. assert_eq!(document.frontmatter().execution().max_attempts(), 3); assert_eq!(document.frontmatter().execution().max_turns(), 4); assert_eq!(document.frontmatter().execution().max_retry_backoff_ms(), 300_000); - assert_eq!(document.frontmatter().execution().max_concurrent_agents(), 2); + assert_eq!( + document.frontmatter().execution().max_concurrent_agents(), + WorkflowConcurrencyLimit::Limited(2) + ); assert_eq!(document.frontmatter().execution().canonicalize_commands(), ["cargo make fmt"]); assert_eq!(document.frontmatter().execution().verify_commands(), ["cargo make test"]); assert_eq!( @@ -1619,6 +1743,33 @@ Then validate the lane. assert_eq!(reparsed, document); } + #[test] + fn parses_unlimited_global_concurrency_limit() { + let document = parse_valid_workflow_with(|markdown| { + *markdown = markdown + .replace("max_concurrent_agents = 1", "max_concurrent_agents = \"unlimited\""); + }) + .expect("unlimited global concurrency should parse"); + + assert_eq!( + document.frontmatter().execution().max_concurrent_agents(), + WorkflowConcurrencyLimit::Unlimited + ); + } + + #[test] + fn defaults_missing_global_concurrency_limit_to_unlimited() { + let document = parse_valid_workflow_with(|markdown| { + *markdown = markdown.replace("max_concurrent_agents = 1\n", ""); + }) + .expect("missing global concurrency should parse"); + + assert_eq!( + document.frontmatter().execution().max_concurrent_agents(), + WorkflowConcurrencyLimit::Unlimited + ); + } + #[test] fn rejects_zero_global_concurrency_limit() { let result = WorkflowDocument::parse_markdown( diff --git a/docs/runbook/self-dogfood-pilot.md b/docs/runbook/self-dogfood-pilot.md index 47d9a71f..2f2434e2 100644 --- a/docs/runbook/self-dogfood-pilot.md +++ b/docs/runbook/self-dogfood-pilot.md @@ -142,7 +142,7 @@ At minimum, the target repo should define: - `[execution] max_attempts` - `[execution] max_turns` - `[execution] max_retry_backoff_ms` -- `[execution] max_concurrent_agents` +- optional `[execution] max_concurrent_agents`; omit it, or set it to `"unlimited"`, to run without a project-level concurrent-agent cap - optional `[context] read_first = [...]` only when the repo truly needs extra repo-local files loaded in addition to the `WORKFLOW.md` body; treat this as a Decodex-local extension, not as the primary policy surface Child-run execution policy is not part of the project-owned `WORKFLOW.md` contract. `decodex` must let `codex app-server` inherit sandbox and approval behavior from the active Codex runtime instead of pinning repo-local overrides. diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 1ff6a976..b3afc1af 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -109,8 +109,8 @@ Optional future expansion: Current runtime note: -- The current hack-ink `decodex` runtime is a single-worker model, so project-level concurrency is one dispatch slot. -- Active leases are the service-local claim set for that slot until a broader concurrency model lands. +- Project-level concurrency is unlimited by default; a project may opt into a finite cap with `[execution] max_concurrent_agents = `. +- Active leases are the service-local claim set for running lanes, and shared dispatch-slot locks coordinate cross-process capacity when a finite cap is configured. ## Lane model diff --git a/docs/spec/workflow-file.md b/docs/spec/workflow-file.md index 9ec9c714..c6dbd2fa 100644 --- a/docs/spec/workflow-file.md +++ b/docs/spec/workflow-file.md @@ -138,9 +138,10 @@ Supported keys: - required - note: caps control-plane-owned failure retry backoff in milliseconds; clean continuation retries use a separate short fixed delay in runtime policy - `max_concurrent_agents` - - type: integer - - required - - note: upper-bounds concurrent `decodex` runs per repository; values must be greater than or equal to `1`; Decodex does not apply separate per-state concurrency caps + - type: integer or string `"unlimited"` + - optional + - default: `"unlimited"` + - note: when set to a positive integer, upper-bounds concurrent `decodex` runs per repository; when omitted or set to `"unlimited"`, Decodex does not apply a project-level concurrent-agent cap; Decodex does not apply separate per-state concurrency caps - `canonicalize_commands` - type: array of string - required @@ -310,7 +311,7 @@ transport = "stdio://" max_attempts = 3 max_turns = 1 max_retry_backoff_ms = 300000 -max_concurrent_agents = 2 +max_concurrent_agents = "unlimited" canonicalize_commands = [ "cargo make fmt", "cargo make lint",