Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions apps/decodex/src/orchestrator/operator_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ fn handle_operator_dashboard_websocket_connection(
write_dashboard_websocket_event(&mut stream, "snapshot", &payload)?;
}

write_current_dashboard_run_activity_event(&mut stream, state_store, &session.subscription);

loop {
for frame in read_dashboard_websocket_client_frames(&mut stream, &mut client_frame_buffer)? {
match frame {
Expand All @@ -323,6 +325,13 @@ fn handle_operator_dashboard_websocket_connection(
{
write_dashboard_websocket_event(&mut stream, "snapshot", &payload)?;
}
if dashboard_control_ack_should_push_run_activity(&response) {
write_current_dashboard_run_activity_event(
&mut stream,
state_store,
&session.subscription,
);
}
},
DashboardClientFrame::Close => return Ok(()),
DashboardClientFrame::Ping(payload) => {
Expand Down Expand Up @@ -479,6 +488,48 @@ fn dashboard_control_ack_should_push_snapshot(ack: &Value) -> bool {
)
}

fn dashboard_control_ack_should_push_run_activity(ack: &Value) -> bool {
ack.get("accepted").and_then(Value::as_bool).unwrap_or(false)
&& matches!(
ack.get("action").and_then(Value::as_str),
Some("subscribe" | "focus" | "clearFocus" | "selectAccount" | "clearAccountSelection")
)
}

fn write_current_dashboard_run_activity_event(
stream: &mut TcpStream,
state_store: &StateStore,
subscription: &DashboardClientSubscription,
) {
match build_operator_run_activity_event(state_store).and_then(|event| {
if let Some(event) = dashboard_event_for_subscription(&event.event, subscription) {
if !dashboard_run_activity_event_has_active_runs(&event) {
return Ok(());
}

write_dashboard_websocket_event(stream, event.event_type, &event.payload)?;
}

Ok(())
}) {
Ok(()) => {},
Err(error) => {
tracing::warn!(
?error,
"Skipped immediate dashboard run activity snapshot for a WebSocket client."
);
},
}
}

fn dashboard_run_activity_event_has_active_runs(event: &DashboardBroadcastEvent) -> bool {
event
.payload
.get("activeRuns")
.and_then(Value::as_array)
.is_some_and(|runs| !runs.is_empty())
}

fn write_dashboard_websocket_event(
stream: &mut TcpStream,
event_type: &'static str,
Expand Down
97 changes: 97 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,103 @@ fn operator_dashboard_websocket_sends_current_snapshot_on_connect() {
server.join().expect("server thread should complete");
}

#[test]
fn operator_dashboard_websocket_sends_current_run_activity_on_connect() {
let (_temp_dir, config, _workflow) = temp_project_layout();
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("listener address should resolve");
let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot {
snapshot_json: Some(
br#"{"project_id":"pubfi","active_runs":[],"account_control":{"mode":"balanced","account_selector":null}}"#.to_vec(),
),
last_publish_unix_epoch: Some(1_774_000_000),
}));
let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open"));
let registration = ProjectRegistration::from_config(
config.service_id(),
&service_config_path(config.repo_root()),
&config,
true,
"test-fingerprint",
);
let issue = sample_issue("Todo", &[]);
let worktree_path = config.worktree_root().join("PUB-101");
let account = CodexAccountActivitySummary {
account_fingerprint: String::from("acct-1"),
status: String::from("available"),
refresh_status: String::from("ok"),
..Default::default()
};
let dashboard_events = DashboardEventHub::default();
let server_snapshot = Arc::clone(&snapshot);
let server_state_store = Arc::clone(&state_store);
let server_dashboard_events = dashboard_events.clone();

state_store.upsert_project(&registration).expect("project should register");
state_store
.record_run_attempt("run-1", &issue.id, 1, "running")
.expect("run attempt should record");
state_store
.upsert_lease(config.service_id(), &issue.id, "run-1", "In Progress")
.expect("lease should record");
state_store
.upsert_worktree(
config.service_id(),
&issue.id,
"x/pubfi-pub-101",
&worktree_path.display().to_string(),
)
.expect("worktree should record");

state::write_run_account_marker(
&worktree_path,
&CodexAccountMarker {
run_id: "run-1",
attempt_number: 1,
account: &account,
accounts: slice::from_ref(&account),
},
)
.expect("account marker should write");

let server = thread::spawn(move || {
let (stream, _) = listener.accept().expect("listener should accept a connection");

orchestrator::handle_operator_state_endpoint_connection(
stream,
&server_snapshot,
&server_dashboard_events,
&server_state_store,
)
.expect("websocket handler should complete after client disconnect");
});
let (mut client, response, mut frame) = open_dashboard_websocket_client(address);

assert!(response.starts_with("HTTP/1.1 101 Switching Protocols\r\n"));

let _initial_snapshot = read_websocket_json_until(&mut client, &mut frame, |payload| {
payload["type"] == "snapshot"
});
let activity = read_websocket_json_until(&mut client, &mut frame, |payload| {
payload["type"] == "runActivity"
});

assert_eq!(activity["payload"]["activeRuns"][0]["run_id"], "run-1");
assert_eq!(
activity["payload"]["activeRuns"][0]["account"]["account_fingerprint"],
"acct-1"
);
assert_eq!(
activity["payload"]["activeRuns"][0]["accounts"][0]["account_fingerprint"],
"acct-1"
);

drop(client);

dashboard_events.close_clients_for_test();
server.join().expect("server thread should complete");
}

#[test]
fn operator_dashboard_websocket_accepts_subscription_and_project_pause_control() {
let (_temp_dir, config, _workflow) = temp_project_layout();
Expand Down
62 changes: 41 additions & 21 deletions apps/decodex/src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,18 +519,21 @@ impl<'de> Visitor<'de> for WorkflowConcurrencyLimitVisitor {
type Value = WorkflowConcurrencyLimit;

fn expecting(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.write_str("a positive integer or \"unlimited\"")
formatter.write_str("an integer greater than or equal to zero")
}

fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
where
E: Error,
{
if value <= 0 {
if value < 0 {
return Err(E::custom(
"`execution.max_concurrent_agents` must be greater than zero or \"unlimited\".",
"`execution.max_concurrent_agents` must be greater than or equal to zero.",
));
}
if value == 0 {
return Ok(WorkflowConcurrencyLimit::Unlimited);
}

u32::try_from(value)
.map(WorkflowConcurrencyLimit::Limited)
Expand All @@ -542,26 +545,20 @@ impl<'de> Visitor<'de> for WorkflowConcurrencyLimitVisitor {
E: Error,
{
if value == 0 {
return Err(E::custom(
"`execution.max_concurrent_agents` must be greater than zero or \"unlimited\".",
));
return Ok(WorkflowConcurrencyLimit::Unlimited);
}

u32::try_from(value)
.map(WorkflowConcurrencyLimit::Limited)
.map_err(|_error| E::custom("`execution.max_concurrent_agents` exceeds `u32::MAX`."))
}

fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
fn visit_str<E>(self, _value: &str) -> std::result::Result<Self::Value, E>
where
E: Error,
{
if value == "unlimited" {
return Ok(WorkflowConcurrencyLimit::Unlimited);
}

Err(E::custom(
"`execution.max_concurrent_agents` must be a positive integer or \"unlimited\".",
"`execution.max_concurrent_agents` must be an integer greater than or equal to zero.",
))
}
}
Expand Down Expand Up @@ -594,7 +591,7 @@ impl WorkflowConcurrencyLimit {
fn validate(self) -> crate::prelude::Result<()> {
if matches!(self, Self::Limited(0)) {
eyre::bail!(
"`execution.max_concurrent_agents` must be greater than zero or \"unlimited\"."
"`execution.max_concurrent_agents` finite limits must be greater than zero."
);
}

Expand All @@ -617,7 +614,7 @@ impl Serialize for WorkflowConcurrencyLimit {
S: Serializer,
{
match self {
Self::Unlimited => serializer.serialize_str("unlimited"),
Self::Unlimited => serializer.serialize_u32(0),
Self::Limited(limit) => serializer.serialize_u32(*limit),
}
}
Expand Down Expand Up @@ -1747,21 +1744,40 @@ Then validate the lane.
}

#[test]
fn parses_unlimited_global_concurrency_limit() {
fn parses_zero_global_concurrency_limit_as_unlimited() {
let document = parse_valid_workflow_with(|markdown| {
*markdown = markdown
.replace("max_concurrent_agents = 1", "max_concurrent_agents = \"unlimited\"");
*markdown = markdown.replace("max_concurrent_agents = 1", "max_concurrent_agents = 0");
})
.expect("unlimited global concurrency should parse");
.expect("zero global concurrency should parse");

assert_eq!(
document.frontmatter().execution().max_concurrent_agents(),
WorkflowConcurrencyLimit::Unlimited
);
assert!(
document
.to_markdown()
.expect("workflow markdown should render")
.contains("max_concurrent_agents = 0")
);
}

#[test]
fn rejects_string_global_concurrency_limit() {
let result = parse_valid_workflow_with(|markdown| {
*markdown = markdown
.replace("max_concurrent_agents = 1", "max_concurrent_agents = \"unlimited\"");
});
let error = result.expect_err("string global concurrency should be invalid");

assert!(
error.to_string().contains("must be an integer greater than or equal to zero"),
"unexpected error: {error:?}"
);
}

#[test]
fn rejects_zero_global_concurrency_limit() {
fn rejects_negative_global_concurrency_limit() {
let result = WorkflowDocument::parse_markdown(
r#"
+++
Expand All @@ -1785,7 +1801,7 @@ transport = "stdio://"
max_attempts = 3
max_turns = 1
max_retry_backoff_ms = 300000
max_concurrent_agents = 0
max_concurrent_agents = -1
gate_profiles = {}
canonicalize_commands = []
verify_commands = []
Expand All @@ -1800,8 +1816,12 @@ read_first = []
+++
"#,
);
let error = result.expect_err("negative global concurrency should be invalid");

assert!(result.is_err(), "zero global concurrency should be invalid");
assert!(
error.to_string().contains("must be greater than or equal to zero"),
"unexpected error: {error:?}"
);
}

fn parse_valid_workflow_with(rewrite: impl FnOnce(&mut String)) -> Result<WorkflowDocument> {
Expand Down
2 changes: 1 addition & 1 deletion docs/runbook/self-dogfood-pilot.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ At minimum, the target repo should define:
- `[execution] max_attempts`
- `[execution] max_turns`
- `[execution] max_retry_backoff_ms`
- `[execution] max_concurrent_agents`; set it explicitly to `"unlimited"` to run without a project-level concurrent-agent cap
- `[execution] max_concurrent_agents`; set it explicitly to `0` to run without a project-level concurrent-agent cap
- optional `[context] read_first = [...]` only when the repo truly needs extra repo-local files loaded in addition to the `WORKFLOW.md` body; treat this as a Decodex-local extension, not as the primary policy surface

Child-run execution policy is not part of the project-owned `WORKFLOW.md` contract. `decodex` must let `codex app-server` inherit sandbox and approval behavior from the active Codex runtime instead of pinning repo-local overrides.
Expand Down
2 changes: 1 addition & 1 deletion docs/spec/runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Optional future expansion:

Current runtime note:

- Project-level concurrency must be explicit; set `[execution] max_concurrent_agents = "unlimited"` for no project-level cap, or use a positive integer for a finite cap.
- Project-level concurrency must be explicit; set `[execution] max_concurrent_agents = 0` for no project-level cap, or use a positive integer for a finite cap.
- Active leases are the service-local claim set for running lanes, and shared dispatch-slot locks coordinate cross-process capacity when a finite cap is configured.

## Lane model
Expand Down
6 changes: 3 additions & 3 deletions docs/spec/workflow-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ Supported keys:
- required
- note: caps control-plane-owned failure retry backoff in milliseconds; clean continuation retries use a separate short fixed delay in runtime policy
- `max_concurrent_agents`
- type: integer or string `"unlimited"`
- type: integer
- required
- note: set to `"unlimited"` to run without a project-level concurrent-agent cap; when set to a positive integer, upper-bounds concurrent `decodex` runs per repository; Decodex does not apply separate per-state concurrency caps
- note: set to `0` to run without a project-level concurrent-agent cap; when set to a positive integer, upper-bounds concurrent `decodex` runs per repository; negative values are invalid; Decodex does not apply separate per-state concurrency caps
- `canonicalize_commands`
- type: array of string
- required
Expand Down Expand Up @@ -310,7 +310,7 @@ transport = "stdio://"
max_attempts = 3
max_turns = 1
max_retry_backoff_ms = 300000
max_concurrent_agents = "unlimited"
max_concurrent_agents = 0
canonicalize_commands = [
"cargo make fmt",
"cargo make lint",
Expand Down