Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ The governing workflow lives at `docs/runbook/local-github-signal-workflow.md`.
`decodex serve` owns the local operator listener. It serves the operator dashboard from
`GET /` and `GET /dashboard`; published snapshots, active-run updates, and local
dashboard controls flow through the `/dashboard/control` WebSocket. The HTTP surface is
kept to dashboard pages/assets and `GET /livez`.
kept to dashboard pages/assets, `GET /livez`, and the local account-control API used by
Decodex App.

For dashboard UI development, use the mock operator dashboard server:

Expand Down
60 changes: 56 additions & 4 deletions apps/decodex-app/Sources/DecodexApp/DecodexServerBridge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ actor DecodexServerBridge {

private let defaultBaseURL = URL(string: "http://127.0.0.1:8912")!
private let defaultListenAddress = "127.0.0.1:8912"
private let liveCheckFreshness: TimeInterval = 5
private var serverBaseURL: URL?
private var liveCheckBaseURL: URL?
private var liveCheckedAt: Date?
private var startedProcess: Process?

func run<T: Decodable & Sendable>(_ request: AppBridgeRequest, as type: T.Type) async throws -> T {
Expand All @@ -20,6 +23,24 @@ actor DecodexServerBridge {
}

let baseURL = try await ensureServer()
do {
return try await send(route, baseURL: baseURL, as: type)
} catch let error as DecodexAppBridgeError {
throw error
} catch let error as DecodingError {
throw error
} catch {
clearLive(baseURL)

return try await send(route, baseURL: try await ensureServer(), as: type)
}
}

private func send<T: Decodable & Sendable>(
_ route: ServerRoute,
baseURL: URL,
as type: T.Type
) async throws -> T {
let url = routeURL(baseURL: baseURL, route: route)
var urlRequest = URLRequest(url: url)

Expand Down Expand Up @@ -47,22 +68,30 @@ actor DecodexServerBridge {
)
}

noteLive(baseURL)

return try JSONDecoder().decode(type, from: data)
}

private func ensureServer() async throws -> URL {
if let serverBaseURL, hasFreshLiveCheck(serverBaseURL) {
return serverBaseURL
}

if let serverBaseURL, await isLive(serverBaseURL) {
noteLive(serverBaseURL)

return serverBaseURL
}

if let configured = configuredServerURL(), await isLive(configured) {
serverBaseURL = configured
noteLive(configured)

return configured
}

if await isLive(defaultBaseURL) {
serverBaseURL = defaultBaseURL
noteLive(defaultBaseURL)

return defaultBaseURL
}
Expand All @@ -71,7 +100,7 @@ actor DecodexServerBridge {

for _ in 0..<40 {
if await isLive(defaultBaseURL) {
serverBaseURL = defaultBaseURL
noteLive(defaultBaseURL)

return defaultBaseURL
}
Expand All @@ -88,6 +117,30 @@ actor DecodexServerBridge {
return value.isEmpty ? nil : URL(string: value)
}

private func hasFreshLiveCheck(_ baseURL: URL) -> Bool {
guard liveCheckBaseURL == baseURL, let liveCheckedAt else {
return false
}

return Date().timeIntervalSince(liveCheckedAt) < liveCheckFreshness
}

private func noteLive(_ baseURL: URL) {
serverBaseURL = baseURL
liveCheckBaseURL = baseURL
liveCheckedAt = Date()
}

private func clearLive(_ baseURL: URL) {
if serverBaseURL == baseURL {
serverBaseURL = nil
}
if liveCheckBaseURL == baseURL {
liveCheckBaseURL = nil
liveCheckedAt = nil
}
}

private func isLive(_ baseURL: URL) async -> Bool {
let url = baseURL.appendingPathComponent("livez")
var request = URLRequest(url: url)
Expand Down Expand Up @@ -117,7 +170,6 @@ actor DecodexServerBridge {
"serve",
"--api-only",
"--listen-address", defaultListenAddress,
"--interval", "30s",
]
process.standardOutput = nullDevice
process.standardError = nullDevice
Expand Down
40 changes: 34 additions & 6 deletions apps/decodex/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,10 @@ impl RunCommand {

#[derive(Debug, Args)]
struct ServeCommand {
/// Poll interval between control-plane ticks, for example `60s` or `5m`.
#[arg(long, value_name = "INTERVAL", default_value = "60s", value_parser = parse_duration_arg)]
interval: Duration,
/// Poll interval between control-plane ticks, for example `60s` or `5m`; unavailable with
/// `--api-only`.
#[arg(long, value_name = "INTERVAL", value_parser = parse_duration_arg)]
interval: Option<Duration>,
/// Operator UI listen address.
#[arg(long, value_name = "ADDR", default_value = "127.0.0.1:8912")]
listen_address: String,
Expand All @@ -386,9 +387,19 @@ struct ServeCommand {
}
impl ServeCommand {
fn run(&self, config_path: Option<&Path>) -> crate::prelude::Result<()> {
if self.api_only && self.interval.is_some() {
eyre::bail!(
"serve --api-only does not accept --interval because API-only mode does not poll projects."
);
}

orchestrator::run_control_plane(ServeRequest {
config_path,
poll_interval: self.interval,
poll_interval: if self.api_only {
None
} else {
Some(self.interval.unwrap_or_else(|| Duration::from_secs(60)))
},
listen_address: &self.listen_address,
api_only: self.api_only,
})
Expand Down Expand Up @@ -868,7 +879,7 @@ mod tests {
assert!(matches!(
cli.command,
Command::Serve(ServeCommand { interval, listen_address, api_only })
if interval == Duration::from_secs(30)
if interval == Some(Duration::from_secs(30))
&& listen_address == "127.0.0.1:9000"
&& !api_only
));
Expand All @@ -878,7 +889,24 @@ mod tests {
fn parses_serve_api_only() {
let cli = Cli::parse_from(["decodex", "serve", "--api-only"]);

assert!(matches!(cli.command, Command::Serve(ServeCommand { api_only: true, .. })));
assert!(matches!(
cli.command,
Command::Serve(ServeCommand { interval: None, api_only: true, .. })
));
}

#[test]
fn rejects_serve_api_only_with_interval() {
let cli = Cli::parse_from(["decodex", "serve", "--api-only", "--interval", "30s"]);
let Command::Serve(command) = cli.command else {
panic!("expected serve command");
};
let error =
command.run(None).expect_err("api-only serve must reject interval configuration");
let message = error.to_string();

assert!(message.contains("--api-only"));
assert!(message.contains("--interval"));
}

#[test]
Expand Down
60 changes: 49 additions & 11 deletions apps/decodex/src/orchestrator/entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,61 @@ pub(crate) fn run_once(request: RunOnceRequest<'_>) -> Result<()> {
}

pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> {
if request.poll_interval.is_zero() {
eyre::bail!("serve interval must be greater than zero.");
}
if request.api_only && request.config_path.is_some() {
eyre::bail!(
"serve --api-only does not accept --config because it must not register or poll projects."
);
}
if request.api_only && request.poll_interval.is_some() {
eyre::bail!(
"serve --api-only does not accept --interval because API-only mode does not poll projects."
);
}

validate_daemon_runtime()?;

let state_store = Arc::new(runtime::open_runtime_store()?);

if request.api_only {
let operator_state_endpoint =
OperatorStateEndpoint::start(request.listen_address, Arc::clone(&state_store))?;
let runtime_db_path = runtime::runtime_db_path()?;
let global_config_path = runtime::global_config_path()?;
let project_config_dir = runtime::project_config_dir()?;
let snapshot = run_control_plane_api_only_tick(&state_store)?;

if let Err(error) = operator_state_endpoint.publish_snapshot(&snapshot) {
let _ = error;

tracing::warn!(
"Operator snapshot publish failed; sensitive runtime details were withheld from control-plane logs."
);
}

tracing::info!(
listen_address = %operator_state_endpoint.listen_address(),
path = OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH,
ws_path = OPERATOR_DASHBOARD_WS_ENDPOINT_PATH,
api_only = true,
runtime_db_path = %runtime_db_path.display(),
global_config_path = %global_config_path.display(),
project_config_dir = %project_config_dir.display(),
"Starting Decodex API-only operator endpoint."
);

loop {
thread::park();
}
}

let poll_interval = request
.poll_interval
.unwrap_or_else(|| Duration::from_secs(60));

if poll_interval.is_zero() {
eyre::bail!("serve interval must be greater than zero.");
}

if let Some(config_path) = request.config_path {
let Some(config_path) = resolve_config_path(Some(config_path), &state_store)? else {
eyre::bail!(
Expand All @@ -133,11 +175,11 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> {
let mut project_runtimes: HashMap<String, ProjectDaemonRuntime> = HashMap::new();

tracing::info!(
poll_interval_s = request.poll_interval.as_secs(),
poll_interval_s = poll_interval.as_secs(),
listen_address = %operator_state_endpoint.listen_address(),
path = OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH,
ws_path = OPERATOR_DASHBOARD_WS_ENDPOINT_PATH,
api_only = request.api_only,
api_only = false,
runtime_db_path = %runtime_db_path.display(),
global_config_path = %global_config_path.display(),
project_config_dir = %project_config_dir.display(),
Expand All @@ -146,11 +188,7 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> {

loop {
let tick_started_at = Instant::now();
let snapshot = if request.api_only {
run_control_plane_api_only_tick(&state_store)?
} else {
run_control_plane_tick(&state_store, &mut project_runtimes)?
};
let snapshot = run_control_plane_tick(&state_store, &mut project_runtimes)?;

if let Err(error) = operator_state_endpoint.publish_snapshot(&snapshot) {
let _ = error;
Expand All @@ -160,7 +198,7 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> {
);
}

sleep_until_next_tick(request.poll_interval, tick_started_at);
sleep_until_next_tick(poll_interval, tick_started_at);
}
}

Expand Down
2 changes: 1 addition & 1 deletion apps/decodex/src/orchestrator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub(crate) struct RunOnceRequest<'a> {
/// Multi-project local control-plane daemon request.
pub(crate) struct ServeRequest<'a> {
pub(crate) config_path: Option<&'a Path>,
pub(crate) poll_interval: Duration,
pub(crate) poll_interval: Option<Duration>,
pub(crate) listen_address: &'a str,
pub(crate) api_only: bool,
}
Expand Down
2 changes: 1 addition & 1 deletion docs/spec/runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ After a process restart, recent-run history, active lease ownership, retained po
- During an active run, operator snapshots must expose `thread_id` as soon as the Codex thread exists, plus monotonically advancing `event_count`, `last_event_type`, and `last_event_at` once protocol events are recorded. These fields may be hydrated either from the current process journal or from the active lane's `.decodex-run-activity` marker when `status` is running in a separate process.
- `thread_id = null` is expected only before the worker creates the Codex thread for the current run. `event_count = 0`, `last_event_type = null`, and `last_event_at = null` are expected only before the first protocol event for that same run. After the thread exists and protocol activity has started, those empty values indicate missing hydration rather than normal progress.
- Operator snapshots may expose an additive `protocol_activity` object derived from app-server structured messages for the current run. The object stays local/operator-only and should summarize turn status, waiting reason, rate-limit status, and a compact recent event list for high-value app-server activity such as `turn/started`, `turn/completed`, plan updates, diff updates, item start/completion, command output deltas, server request responses, account updates, and rate-limit updates. Missing `protocol_activity` means no structured summary was captured yet; consumers must continue to rely on the older `event_count`, `last_event_type`, `last_event_at`, thread fields, and `child_agent_activity` fields when it is absent.
- The operator snapshot transport must stay local/operator-only. `decodex serve` exposes the human-facing operator console from the canonical HTTP `GET /` and `GET /dashboard` routes, serves only the necessary dashboard assets and `GET /livez` liveness probe over HTTP, and delivers published snapshots, active-run activity, and dashboard control acknowledgements through the local `GET /dashboard/control` WebSocket upgrade.
- The operator snapshot transport must stay local/operator-only. `decodex serve` exposes the human-facing operator console from the canonical HTTP `GET /` and `GET /dashboard` routes, serves only the necessary dashboard assets, `GET /livez` liveness probe, and local account-control API over HTTP, and delivers published snapshots, active-run activity, and dashboard control acknowledgements through the local `GET /dashboard/control` WebSocket upgrade.
- `GET /livez` is only a process- and listener-level liveness probe. It must not claim control-plane tick freshness or forward progress by itself.
- The dashboard must not depend on a separate HTTP snapshot or readiness endpoint; snapshot freshness belongs to the WebSocket-delivered snapshot payload and the browser connection state.
- Reconciliation must mark locally active run attempts as `interrupted` when their
Expand Down