Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apps/decodex/src/orchestrator/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,12 @@ where
context.review_state_inspector,
)?;

while active_children.len()
< context.workflow.frontmatter().execution().max_concurrent_agents() as usize
while context
.workflow
.frontmatter()
.execution()
.max_concurrent_agents()
.has_capacity(active_children.len())
{
if !spawn_next_daemon_child(
config_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,10 @@ fn candidate_selection_allows_multi_slot_dispatch_when_configured() {
)
.expect("workflow should parse");

assert_eq!(workflow.frontmatter().execution().max_concurrent_agents(), 2);
assert_eq!(
workflow.frontmatter().execution().max_concurrent_agents().dispatch_slot_limit(),
Some(2)
);

let state_store = StateStore::open_in_memory().expect("state store should open");

Expand Down
2 changes: 1 addition & 1 deletion apps/decodex/src/orchestrator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ impl ConcurrencySnapshot {
}

fn has_global_capacity(&self, execution: &WorkflowExecution) -> bool {
self.total_active < execution.max_concurrent_agents() as usize
execution.max_concurrent_agents().has_capacity(self.total_active)
}
}

Expand Down
2 changes: 1 addition & 1 deletion apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) struct CodexAccountMarker<'a> {
#[derive(Clone)]
struct DispatchSlotConfig {
root: PathBuf,
slot_limit: usize,
slot_limit: DispatchSlotLimit,
}

struct IssueClaimGuard {
Expand Down
69 changes: 63 additions & 6 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
use std::mem;

use crate::workflow::WorkflowConcurrencyLimit;

/// Shared dispatch-slot capacity for one project.
#[derive(Clone, Copy)]
pub(crate) enum DispatchSlotLimit {
/// Fixed number of cross-process dispatch slots.
Limited(u32),
/// Allocate dispatch slots on demand without a fixed project cap.
Unlimited,
}
impl DispatchSlotLimit {
fn validate(self) -> Result<()> {
if matches!(self, Self::Limited(0)) {
eyre::bail!("dispatch slot limit must be greater than zero or unlimited");
}

Ok(())
}

fn includes(self, slot_index: usize) -> Result<bool> {
match self {
Self::Unlimited => Ok(true),
Self::Limited(limit) => Ok(slot_index
< usize::try_from(limit)
.map_err(|_error| eyre::eyre!("dispatch slot limit overflowed usize"))?),
}
}
}
impl From<u32> for DispatchSlotLimit {
fn from(value: u32) -> Self {
Self::Limited(value)
}
}
impl From<Option<u32>> for DispatchSlotLimit {
fn from(value: Option<u32>) -> Self {
match value {
Some(limit) => Self::Limited(limit),
None => Self::Unlimited,
}
}
}
impl From<WorkflowConcurrencyLimit> for DispatchSlotLimit {
fn from(value: WorkflowConcurrencyLimit) -> Self {
Self::from(value.dispatch_slot_limit())
}
}

/// Local runtime store for leases, attempts, worktrees, and protocol events.
#[derive(Default)]
pub struct StateStore {
Expand Down Expand Up @@ -55,20 +102,21 @@ impl StateStore {
}

/// Configure the shared cross-process dispatch-slot root for one project.
pub fn configure_dispatch_slot_root(
pub(crate) fn configure_dispatch_slot_root(
&self,
project_id: &str,
worktree_root: impl AsRef<Path>,
slot_limit: u32,
slot_limit: impl Into<DispatchSlotLimit>,
) -> Result<()> {
let slot_limit = slot_limit.into();
let mut state = self.lock()?;

slot_limit.validate()?;
state.dispatch_slot_configs.insert(
project_id.to_owned(),
DispatchSlotConfig {
root: worktree_root.as_ref().to_path_buf(),
slot_limit: usize::try_from(slot_limit)
.map_err(|_error| eyre::eyre!("dispatch slot limit overflowed usize"))?,
slot_limit,
},
);

Expand Down Expand Up @@ -201,9 +249,14 @@ impl StateStore {
.map(|guard| guard.slot_index)
.collect::<HashSet<_>>();
let mut acquired_guard = None;
let mut slot_index = 0;

for slot_index in 0..dispatch_slot_config.slot_limit {
while dispatch_slot_config.slot_limit.includes(slot_index)? {
if held_slot_indexes.contains(&slot_index) {
slot_index = slot_index
.checked_add(1)
.ok_or_else(|| eyre::eyre!("dispatch slot index overflowed usize"))?;

continue;
}

Expand All @@ -225,9 +278,13 @@ impl StateStore {

break;
},
Err(TryLockError::WouldBlock) => continue,
Err(TryLockError::WouldBlock) => {},
Err(TryLockError::Error(error)) => return Err(error.into()),
}

slot_index = slot_index
.checked_add(1)
.ok_or_else(|| eyre::eyre!("dispatch slot index overflowed usize"))?;
}

let Some(dispatch_slot_guard) = acquired_guard else {
Expand Down
36 changes: 35 additions & 1 deletion apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tempfile::TempDir;
use crate::{
state::{
self, ChildAgentActivitySummary, CodexAccountActivitySummary, CodexAccountMarker,
EffectiveRuntimeMarker, PreacquiredLeaseGuards, ProjectRegistration,
DispatchSlotLimit, EffectiveRuntimeMarker, PreacquiredLeaseGuards, ProjectRegistration,
ProtocolActivityMarker, ProtocolActivitySummary, RUN_ACTIVITY_MARKER_FILE,
RUN_OPERATION_REPO_GATE, ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore,
},
Expand Down Expand Up @@ -516,6 +516,40 @@ fn shared_dispatch_slots_honor_configured_limit_across_process_local_stores() {
);
}

#[test]
fn shared_dispatch_slots_support_unlimited_across_process_local_stores() {
let temp_dir = TempDir::new().expect("tempdir should create");
let store_one = StateStore::open_in_memory().expect("first store should open");
let store_two = StateStore::open_in_memory().expect("second store should open");
let store_three = StateStore::open_in_memory().expect("third store should open");

store_one
.configure_dispatch_slot_root("pubfi", temp_dir.path(), DispatchSlotLimit::Unlimited)
.expect("first store should configure unlimited dispatch slots");
store_two
.configure_dispatch_slot_root("pubfi", temp_dir.path(), DispatchSlotLimit::Unlimited)
.expect("second store should configure unlimited dispatch slots");
store_three
.configure_dispatch_slot_root("pubfi", temp_dir.path(), DispatchSlotLimit::Unlimited)
.expect("third store should configure unlimited dispatch slots");

assert!(
store_one
.try_acquire_lease("pubfi", "PUB-101", "run-1", IN_PROGRESS_STATE)
.expect("first shared lease acquisition should succeed")
);
assert!(
store_two
.try_acquire_lease("pubfi", "PUB-102", "run-2", IN_PROGRESS_STATE)
.expect("second store should acquire another shared slot")
);
assert!(
store_three
.try_acquire_lease("pubfi", "PUB-103", "run-3", IN_PROGRESS_STATE)
.expect("third store should acquire another shared slot")
);
}

#[test]
fn failed_shared_slot_attempt_releases_issue_claim_before_retry() {
let temp_dir = TempDir::new().expect("tempdir should create");
Expand Down
Loading