Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ Decodex follow-up, public content, or only ledger trace.
- Current Codex/app-server compatibility is capability-gated and recorded in
[`docs/spec/app-server.md`](docs/spec/app-server.md).
- The public site is static and deploys through GitHub Pages.
- Starting `decodex serve` without its `--config` option loads enabled projects from
the explicit registry only. It does not scan Codex history, repo-local config files,
or currently open worktrees to infer projects.
- Starting `decodex serve` without its `--config` option schedules enabled projects
from the explicit registry only. Operator and App snapshots still expose active
runtime DB-backed attempts for disabled projects, because disabling a project pauses
future dispatch rather than deleting visibility or ownership. It does not scan Codex
history, repo-local config files, or currently open worktrees to infer projects.

## Usage

Expand Down
167 changes: 140 additions & 27 deletions apps/decodex/src/orchestrator/entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,18 +797,22 @@ fn run_control_plane_tick_with_options(
let now = Instant::now();

Ok(collect_control_plane_snapshot(registered_projects, |project, project_warnings| {
let runtime = project_runtimes.entry(project.service_id().to_owned()).or_default();
if project.enabled() {
let runtime = project_runtimes.entry(project.service_id().to_owned()).or_default();

run_control_plane_project_tick(
project,
state_store,
run_control_plane_project_tick(
project,
state_store,
runtime,
project_warnings,
linear_scan_requests,
now,
allow_unverified_codex,
)
}))
} else {
control_plane_disabled_project_observer_tick(project, state_store, project_warnings)
}
}))
}

fn drain_operator_linear_scan_requests_best_effort(
Expand Down Expand Up @@ -847,20 +851,10 @@ fn run_control_plane_dev_tick(state_store: &StateStore) -> Result<OperatorStatus
)
}) {
Ok(project_snapshot) => {
if let Some(local_status) = project_snapshot.projects.first() {
project_status.active_run_count = local_status.active_run_count;
project_status.retained_worktree_count = local_status.retained_worktree_count;
project_status.waiting_lane_count = local_status.waiting_lane_count;
project_status.attention_count = local_status.attention_count;
project_status.cleanup_blocked_count = local_status.cleanup_blocked_count;
project_status.cleanup_pending_count = local_status.cleanup_pending_count;
project_status.last_activity_at = local_status.last_activity_at.clone();
project_status.warning_count =
project_status.warning_count.saturating_add(local_status.warning_count);
} else {
project_status.active_run_count = project_snapshot.active_runs.len();
}

hydrate_project_status_from_local_snapshot(
&mut project_status,
&project_snapshot,
);
append_control_plane_project_snapshot(&mut snapshot, project_snapshot);
},
Err(error) => {
Expand All @@ -877,6 +871,29 @@ fn run_control_plane_dev_tick(state_store: &StateStore) -> Result<OperatorStatus
);
},
}
} else {
let mut project_warnings = Vec::new();
let project_tick = control_plane_disabled_project_observer_tick(
registration,
state_store,
&mut project_warnings,
);

for warning in project_warnings {
add_operator_snapshot_warning(&mut snapshot, warning);
}

if let Some(local_status) = project_tick.project_status {
project_status = operator_project_status_from_dev_registration(registration);

hydrate_project_status_from_registered_status(
&mut project_status,
&local_status,
);
}
if let Some(project_snapshot) = project_tick.snapshot {
append_control_plane_project_snapshot(&mut snapshot, project_snapshot);
}
}

project_statuses.push(project_status);
Expand All @@ -890,7 +907,7 @@ fn run_control_plane_dev_tick(state_store: &StateStore) -> Result<OperatorStatus

fn collect_control_plane_snapshot<F>(
registered_projects: Vec<ProjectRegistration>,
mut run_enabled_project_tick: F,
mut run_project_tick: F,
) -> OperatorStatusSnapshot
where
F: FnMut(&ProjectRegistration, &mut Vec<&'static str>) -> ControlPlaneProjectTick,
Expand All @@ -905,14 +922,8 @@ where
}

for project in registered_projects {
if !project.enabled() {
project_statuses.push(operator_project_status_from_registration(&project, 0));

continue;
}

let mut project_warnings = Vec::new();
let project_tick = run_enabled_project_tick(&project, &mut project_warnings);
let project_tick = run_project_tick(&project, &mut project_warnings);

snapshot_warnings.extend(project_warnings);

Expand All @@ -937,6 +948,108 @@ where
snapshot
}

fn control_plane_disabled_project_observer_tick(
project: &ProjectRegistration,
state_store: &StateStore,
snapshot_warnings: &mut Vec<&'static str>,
) -> ControlPlaneProjectTick {
let project_status = operator_project_status_from_registration(project, 0);
let active_runs = match state_store.list_active_runs(project.service_id()) {
Ok(active_runs) => active_runs,
Err(error) => {
let _ = error;

tracing::warn!(
project_id = project.service_id(),
"Disabled project active-run lookup failed; sensitive runtime details were withheld."
);

snapshot_warnings.push("operator_snapshot_build_failed");

return ControlPlaneProjectTick {
snapshot: None,
project_status: Some(project_status),
};
},
};

if active_runs.is_empty() {
return ControlPlaneProjectTick {
snapshot: None,
project_status: Some(project_status),
};
}

match build_registered_project_local_snapshot(project, state_store) {
Ok(project_snapshot) => {
let mut project_status = project_status;

hydrate_project_status_from_local_snapshot(&mut project_status, &project_snapshot);

ControlPlaneProjectTick {
snapshot: Some(project_snapshot),
project_status: Some(project_status),
}
},
Err(error) => {
let _ = error;

tracing::warn!(
project_id = project.service_id(),
"Disabled project active-run snapshot build failed; sensitive runtime details were withheld."
);

snapshot_warnings.push("operator_snapshot_build_failed");

ControlPlaneProjectTick {
snapshot: None,
project_status: Some(project_status),
}
},
}
}

fn build_registered_project_local_snapshot(
project: &ProjectRegistration,
state_store: &StateStore,
) -> Result<OperatorStatusSnapshot> {
let config = ServiceConfig::from_path(project.config_path())?;
let workflow = WorkflowDocument::from_path(config.workflow_path())?;

build_operator_state_snapshot_without_live_observers(
&config,
&workflow,
state_store,
DEFAULT_OPERATOR_DASHBOARD_RUN_LIMIT,
)
}

fn hydrate_project_status_from_local_snapshot(
project_status: &mut OperatorProjectStatus,
project_snapshot: &OperatorStatusSnapshot,
) {
if let Some(local_status) = project_snapshot.projects.first() {
hydrate_project_status_from_registered_status(project_status, local_status);
} else {
project_status.active_run_count = project_snapshot.active_runs.len();
}
}

fn hydrate_project_status_from_registered_status(
project_status: &mut OperatorProjectStatus,
local_status: &OperatorProjectStatus,
) {
project_status.active_run_count = local_status.active_run_count;
project_status.retained_worktree_count = local_status.retained_worktree_count;
project_status.waiting_lane_count = local_status.waiting_lane_count;
project_status.attention_count = local_status.attention_count;
project_status.cleanup_blocked_count = local_status.cleanup_blocked_count;
project_status.cleanup_pending_count = local_status.cleanup_pending_count;
project_status.last_activity_at = local_status.last_activity_at.clone();
project_status.warning_count =
project_status.warning_count.saturating_add(local_status.warning_count);
}

fn aggregate_control_plane_snapshot(
registered_project_count: usize,
mut project_snapshots: Vec<OperatorStatusSnapshot>,
Expand Down
4 changes: 0 additions & 4 deletions apps/decodex/src/orchestrator/operator_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,6 @@ fn build_operator_run_activity_event(
let mut active_runs = Vec::new();

for registration in state_store.list_projects()? {
if !registration.enabled() {
continue;
}

let project = match ServiceConfig::from_path(registration.config_path()) {
Ok(project) => project,
Err(error) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,51 @@ fn control_plane_snapshot_lists_disabled_registered_projects() {
assert!(project_runtimes.is_empty(), "disabled projects should not be ticked");
}

#[test]
fn control_plane_snapshot_includes_disabled_project_active_runs_without_ticking() {
let (temp_dir, config, _workflow) = temp_project_layout();
let _home_guard =
TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("home should be utf-8"));
let state_path = temp_dir.path().join("runtime.sqlite3");
let observer_store = StateStore::open(&state_path).expect("observer store should open");
let writer_store = StateStore::open(&state_path).expect("writer store should open");
let registration = ProjectRegistration::from_config(
config.service_id(),
&service_config_path(config.repo_root()),
&config,
false,
"test-fingerprint",
);
let issue = sample_issue("In Progress", &[]);

observer_store.upsert_project(&registration).expect("project should register");
writer_store
.record_run_attempt("run-disabled-active", &issue.id, 1, "running")
.expect("active run should record");
writer_store
.upsert_lease(config.service_id(), &issue.id, "run-disabled-active", "In Progress")
.expect("active lease should record");

let mut project_runtimes = HashMap::new();
let snapshot =
orchestrator::run_control_plane_tick(&observer_store, &mut project_runtimes, &[])
.expect("control-plane snapshot should build");
let project = snapshot.projects.first().expect("disabled project should be listed");

assert_eq!(snapshot.project_id, "pubfi");
assert_eq!(snapshot.projects.len(), 1);
assert_eq!(project.project_id, "pubfi");
assert!(!project.enabled);
assert_eq!(project.connector_state, "disabled");
assert_eq!(project.active_run_count, 1);
assert_eq!(snapshot.active_runs.len(), 1);
assert_eq!(snapshot.active_runs[0].run_id, "run-disabled-active");
assert_eq!(snapshot.active_runs[0].project_id, "pubfi");
assert_eq!(snapshot.active_runs[0].phase, "executing");
assert!(snapshot.warnings.contains(&String::from("no_enabled_projects")));
assert!(project_runtimes.is_empty(), "disabled projects should not be ticked");
}

#[test]
fn control_plane_linear_scan_cadence_uses_fixed_window_and_manual_override() {
let now = Instant::now();
Expand Down
61 changes: 61 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,67 @@ fn operator_dashboard_run_activity_event_summarizes_active_runs() {
assert_eq!(data["activeRuns"][0]["accounts"][0]["account_fingerprint"], "acct-1");
}

#[test]
fn operator_dashboard_run_activity_event_includes_disabled_project_active_runs() {
let temp_dir = TempDir::new().expect("temp dir should exist");
let _home_guard =
TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("temp path should be UTF-8"));
let (_temp_dir, config, _workflow) = temp_project_layout();
let state_path = temp_dir.path().join("runtime.sqlite3");
let observer_store = StateStore::open(&state_path).expect("observer store should open");
let writer_store = StateStore::open(&state_path).expect("writer store should open");
let registration = ProjectRegistration::from_config(
config.service_id(),
&service_config_path(config.repo_root()),
&config,
false,
"test-fingerprint",
);
let issue = sample_issue("In Progress", &[]);
let worktree_path = config.worktree_root().join("PUB-101");

git_status_success(
config.repo_root(),
&["remote", "add", "origin", "git@github.com:hack-ink/pubfi-mono-v2.git"],
);

observer_store.upsert_project(&registration).expect("project should register");
writer_store
.record_run_attempt("run-disabled-active", &issue.id, 1, "running")
.expect("active run should record");
writer_store
.upsert_lease(config.service_id(), &issue.id, "run-disabled-active", "In Progress")
.expect("active lease should record");
writer_store
.upsert_worktree(
config.service_id(),
&issue.id,
"x/pubfi-pub-101",
&worktree_path.display().to_string(),
)
.expect("worktree should record");

let event = orchestrator::build_operator_run_activity_event(&observer_store)
.expect("event should build");
let message = orchestrator::dashboard_websocket_message(
event.event.event_type,
&event.event.payload,
)
.expect("event should serialize");
let (payload, _consumed) = websocket_text_payload(&message).expect("event should be a text frame");
let payload: Value = serde_json::from_slice(payload).expect("event data should be json");
let data = &payload["payload"];
let active_runs = data["activeRuns"].as_array().expect("active runs should list");

assert_eq!(payload["type"], "runActivity");
assert_eq!(active_runs.len(), 1);
assert_eq!(active_runs[0]["run_id"], "run-disabled-active");
assert_eq!(active_runs[0]["project_id"], "pubfi");
assert_eq!(active_runs[0]["project_display_name"], "hack-ink/pubfi-mono-v2");
assert_eq!(data["activeRunsComplete"], true);
assert_eq!(data["activeRunScope"], "complete");
}

#[test]
fn operator_state_endpoint_reads_complete_headers_before_parsing() {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/operator-control-plane.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ Decodex App is a native shell over the same local runtime and account-pool state
launch it connects to an existing default local listener when one is reachable; if
not, it starts the bundled `decodex` binary as
`decodex serve --listen-address 127.0.0.1:8912`. The app fallback is a normal
control-plane server: it loads the enabled project registry, uses the CLI-owned default
cadences, and serves the dashboard, account APIs, `GET /api/operator-snapshot`,
control-plane server: it loads the registered project registry, schedules only enabled
projects, keeps active runtime DB-backed runs visible even when a project is disabled
for future dispatch, uses the CLI-owned default cadences, and serves the dashboard,
account APIs, `GET /api/operator-snapshot`,
`POST /api/linear-scan`, `GET /api/lane/inspect`, and `POST /api/lane/interrupt` from
the single local listener.

Expand Down
2 changes: 1 addition & 1 deletion docs/spec/runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ mutations, or duplicate comment for that logical event.

The local runtime store is the global Decodex SQLite database for one local installation. It lives at `~/.codex/decodex/runtime.sqlite3`, not inside any registered project checkout or worktree. Every row that belongs to a repo is scoped by `project_id`. Decodex logs live beside that database under `~/.codex/decodex/logs/`, the optional shared Codex account pool lives at `~/.codex/decodex/accounts.jsonl`, global operator config lives at `~/.codex/decodex/config.toml`, bounded local account usage estimates live at `~/.codex/decodex/account-usage-history.jsonl`, and agent-readable derived evidence lives under `~/.codex/decodex/agent-evidence/<service-id>/`; vendor-qualified app-data directories and per-project runtime databases are not part of the runtime contract. Global operator config owns account-pool routing and shared account display-name offsets. Account usage history owns local seven-day display estimates and non-secret account capacity weights only; it does not contain token material and does not decide scheduling. UI-only preferences such as theme, table sorting, and local privacy visibility are not runtime state.

Project contracts live outside registered repositories under `~/.codex/decodex/projects/<service-id>/`. Each project directory must contain `project.toml` and `WORKFLOW.md`; arbitrary project file names such as `<service-id>.toml` are not part of the contract. `project.toml` must set `[paths].repo_root` so the project contract is explicit. The `[github]` table owns the routed token environment variable and may also set `command_path` when the expected `gh` binary should be explicit for GUI-launched runs. Project registration stores the centralized `config_path`, target `repo_root`, `worktree_root`, and workflow path in the global runtime database. Commands that start inside a registered checkout or lane worktree resolve the project through that registry; they do not discover or trust worktree-local config files. Project config refreshes preserve an existing enabled or disabled registry toggle; only explicit operator commands such as `decodex project add <project-dir>`, `decodex project enable <service-id>`, and `decodex project disable <service-id>` may change that toggle. `decodex serve` loads enabled registered projects from the global runtime database. It must not scan `.codex` history, repo-local config files, or currently open worktrees to infer additional projects.
Project contracts live outside registered repositories under `~/.codex/decodex/projects/<service-id>/`. Each project directory must contain `project.toml` and `WORKFLOW.md`; arbitrary project file names such as `<service-id>.toml` are not part of the contract. `project.toml` must set `[paths].repo_root` so the project contract is explicit. The `[github]` table owns the routed token environment variable and may also set `command_path` when the expected `gh` binary should be explicit for GUI-launched runs. Project registration stores the centralized `config_path`, target `repo_root`, `worktree_root`, and workflow path in the global runtime database. Commands that start inside a registered checkout or lane worktree resolve the project through that registry; they do not discover or trust worktree-local config files. Project config refreshes preserve an existing enabled or disabled registry toggle; only explicit operator commands such as `decodex project add <project-dir>`, `decodex project enable <service-id>`, and `decodex project disable <service-id>` may change that toggle. `decodex serve` schedules and polls enabled registered projects from the global runtime database; the operator and App projections must still expose active runtime DB-backed attempts for disabled projects because pause is a future-dispatch control, not a visibility or ownership deletion. It must not scan `.codex` history, repo-local config files, or currently open worktrees to infer additional projects.

`project.toml` may also configure `[privacy_classifier]` with a loopback HTTP
`endpoint` and bounded `timeout_ms` for an operator-managed local classifier runtime.
Expand Down