diff --git a/README.md b/README.md index e290d37..7fb50df 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,7 @@ cargo run -p decodex --bin decodex -- --help cargo run -p decodex --bin decodex -- probe stdio:// cargo run -p decodex --bin decodex -- project list cargo run -p decodex --bin decodex -- status +cargo run -p decodex --bin decodex -- status --live cargo run -p decodex --bin decodex -- diagnose --json cargo run -p decodex --bin decodex -- maintenance prune --dry-run cargo run -p decodex --bin decodex -- lane steer --run-id --expected-turn-id --message @@ -124,6 +125,11 @@ operator wants to override registry-based project resolution for that command. Use `--allow-unverified-codex` on `run`, `serve`, or `probe` only when deliberately dogfooding a Codex build outside the locally verified app-server range; the default guard remains fail-closed. +`decodex status` prints the local runtime snapshot without refreshing live +tracker, pull-request, or Codex account usage observers. Use `decodex status --live` +when the operator needs fresh Linear/GitHub readback before acting; use the Accounts +API refresh path, such as `GET /api/accounts?refresh=1`, when the operator needs +fresh ChatGPT account usage probes. `decodex serve` uses hardcoded scheduler cadences: the local control-plane loop publishes snapshots every 15 seconds, and Linear-backed queue/status scans run at most every 5 minutes per project unless an operator or agent requests an explicit diff --git a/apps/decodex/src/accounts.rs b/apps/decodex/src/accounts.rs index a00bcea..72774b3 100644 --- a/apps/decodex/src/accounts.rs +++ b/apps/decodex/src/accounts.rs @@ -724,6 +724,12 @@ impl AccountSummary { window_seconds: basis.window_seconds, checked_at_unix_epoch, resets_at_unix_epoch: basis.resets_at_unix_epoch, + primary_window_seconds: self.primary_window_seconds, + primary_remaining_percent: self.primary_remaining_percent, + primary_resets_at_unix_epoch: self.primary_resets_at_unix_epoch, + secondary_window_seconds: self.secondary_window_seconds, + secondary_remaining_percent: self.secondary_remaining_percent, + secondary_resets_at_unix_epoch: self.secondary_resets_at_unix_epoch, }) } @@ -793,6 +799,18 @@ struct AccountUsageHistoryRecord { checked_at_unix_epoch: i64, #[serde(skip_serializing_if = "Option::is_none")] resets_at_unix_epoch: Option, + #[serde(skip_serializing_if = "Option::is_none")] + primary_window_seconds: Option, + #[serde(skip_serializing_if = "Option::is_none")] + primary_remaining_percent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + primary_resets_at_unix_epoch: Option, + #[serde(skip_serializing_if = "Option::is_none")] + secondary_window_seconds: Option, + #[serde(skip_serializing_if = "Option::is_none")] + secondary_remaining_percent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + secondary_resets_at_unix_epoch: Option, } impl AccountUsageHistoryRecord { fn daily_summary(&self) -> AccountUsageDailySummary { @@ -826,6 +844,58 @@ impl AccountUsageHistoryRecord { .zip(other.email.as_deref()) .is_some_and(|(left, right)| left == right)) } + + fn apply_missing_usage_windows(&self, account: &mut AccountSummary, now_unix_epoch: i64) { + self.apply_missing_primary_usage_window(account, now_unix_epoch); + self.apply_missing_secondary_usage_window(account, now_unix_epoch); + } + + fn apply_missing_primary_usage_window( + &self, + account: &mut AccountSummary, + now_unix_epoch: i64, + ) { + if has_usage_window(account.primary_window_seconds, account.primary_remaining_percent) + || !has_current_usage_window( + self.primary_window_seconds, + self.primary_remaining_percent, + self.primary_resets_at_unix_epoch, + now_unix_epoch, + ) { + return; + } + + account.primary_window_seconds = self.primary_window_seconds; + account.primary_remaining_percent = self.primary_remaining_percent; + account.primary_resets_at_unix_epoch = self.primary_resets_at_unix_epoch; + } + + fn apply_missing_secondary_usage_window( + &self, + account: &mut AccountSummary, + now_unix_epoch: i64, + ) { + let window_seconds = self.secondary_window_seconds.or(self.window_seconds); + let remaining_percent = self + .secondary_remaining_percent + .or_else(|| Some(remaining_percent_from_used(self.used_percent))); + let resets_at_unix_epoch = + self.secondary_resets_at_unix_epoch.or(self.resets_at_unix_epoch); + + if has_usage_window(account.secondary_window_seconds, account.secondary_remaining_percent) + || !has_current_usage_window( + window_seconds, + remaining_percent, + resets_at_unix_epoch, + now_unix_epoch, + ) { + return; + } + + account.secondary_window_seconds = window_seconds; + account.secondary_remaining_percent = remaining_percent; + account.secondary_resets_at_unix_epoch = resets_at_unix_epoch; + } } #[derive(Default)] @@ -892,6 +962,8 @@ impl AccountUsageHistory { } fn apply_to_accounts(&self, accounts: &mut [AccountSummary]) { + let now = OffsetDateTime::now_utc().unix_timestamp(); + for account in accounts { let matching_records = self .records @@ -899,15 +971,18 @@ impl AccountUsageHistory { .filter(|record| record.matches_account(account)) .collect::>(); - if account.seven_day_used_percent.is_none() - && let Some(latest) = - matching_records.iter().max_by_key(|record| record.checked_at_unix_epoch) + if let Some(latest) = + matching_records.iter().max_by_key(|record| record.checked_at_unix_epoch) { - account.seven_day_used_percent = Some(latest.used_percent); - account.capacity_multiplier = - normalized_account_capacity_multiplier(latest.capacity_multiplier); - account.seven_day_daily_average_percent = - Some(latest.used_percent as f64 / USAGE_ESTIMATE_WINDOW_DAYS as f64); + if account.seven_day_used_percent.is_none() { + account.seven_day_used_percent = Some(latest.used_percent); + account.capacity_multiplier = + normalized_account_capacity_multiplier(latest.capacity_multiplier); + account.seven_day_daily_average_percent = + Some(latest.used_percent as f64 / USAGE_ESTIMATE_WINDOW_DAYS as f64); + } + + latest.apply_missing_usage_windows(account, now); } account.usage_records = @@ -1791,10 +1866,28 @@ fn is_seven_day_usage_window(window_seconds: i64) -> bool { .is_some_and(|delta| delta.abs() <= 3_600) } +fn has_usage_window(window_seconds: Option, remaining_percent: Option) -> bool { + matches!(window_seconds, Some(seconds) if seconds > 0) && remaining_percent.is_some() +} + +fn has_current_usage_window( + window_seconds: Option, + remaining_percent: Option, + resets_at_unix_epoch: Option, + now_unix_epoch: i64, +) -> bool { + has_usage_window(window_seconds, remaining_percent) + && resets_at_unix_epoch.is_some_and(|reset| reset > now_unix_epoch) +} + fn used_percent_from_remaining(remaining_percent: i64) -> i64 { 100_i64.saturating_sub(remaining_percent).clamp(0, 100) } +fn remaining_percent_from_used(used_percent: i64) -> i64 { + 100_i64.saturating_sub(used_percent).clamp(0, 100) +} + fn percent_ratio(numerator: i64, denominator: i64) -> f64 { if denominator <= 0 { return 0.0; @@ -2341,6 +2434,73 @@ mod tests { assert_eq!(estimate.total_used_percent, 63); } + #[test] + fn usage_history_preserves_last_good_windows_across_placeholder_refresh() { + let temp_dir = TempDir::new().expect("temp dir should create"); + let store = AccountStore::new( + temp_dir.path().join("accounts.jsonl"), + temp_dir.path().join("config.toml"), + ); + let now = time::OffsetDateTime::now_utc().unix_timestamp(); + + store + .save_records(&[account_record( + "copy@example.com", + "acct_123456", + "header.eyJleHAiOjQxMDI0NDQ4MDB9.sig", + "refresh-secret", + )]) + .expect("records should save"); + + let good_summary = CodexAccountActivitySummary { + account_fingerprint: String::from("...123456"), + email: Some(String::from("copy@example.com")), + plan_type: Some(String::from("pro")), + status: String::from("available"), + refresh_status: String::from("not_needed"), + checked_at_unix_epoch: Some(now), + primary_window_seconds: Some(18_000), + primary_remaining_percent: Some(72), + primary_resets_at_unix_epoch: Some(now + 18_000), + secondary_window_seconds: Some(604_800), + secondary_remaining_percent: Some(91), + secondary_resets_at_unix_epoch: Some(now + 604_800), + ..CodexAccountActivitySummary::default() + }; + let mut response = store.list().expect("account list should load"); + + response.apply_usage_summaries(&[good_summary]); + response.refresh_usage_records(&store.accounts_path).expect("usage history should refresh"); + + let degraded_summary = CodexAccountActivitySummary { + account_fingerprint: String::from("...123456"), + email: Some(String::from("copy@example.com")), + plan_type: Some(String::from("pro")), + status: String::from("available"), + refresh_status: String::from("not_needed"), + checked_at_unix_epoch: Some(now + 60), + profile_lifetime_tokens: Some(47_200_000_000), + ..CodexAccountActivitySummary::default() + }; + let mut degraded_response = store.list().expect("account list should reload"); + + degraded_response.apply_usage_summaries(&[degraded_summary]); + degraded_response + .refresh_usage_records(&store.accounts_path) + .expect("usage history should restore usable windows"); + + let account = °raded_response.accounts[0]; + + assert_eq!(account.primary_window_seconds, Some(18_000)); + assert_eq!(account.primary_remaining_percent, Some(72)); + assert_eq!(account.primary_resets_at_unix_epoch, Some(now + 18_000)); + assert_eq!(account.secondary_window_seconds, Some(604_800)); + assert_eq!(account.secondary_remaining_percent, Some(91)); + assert_eq!(account.secondary_resets_at_unix_epoch, Some(now + 604_800)); + assert_eq!(account.seven_day_used_percent, Some(9)); + assert_eq!(account.profile_lifetime_tokens, Some(47_200_000_000)); + } + fn account_record( email: &str, account_id: &str, diff --git a/apps/decodex/src/agent/codex_accounts.rs b/apps/decodex/src/agent/codex_accounts.rs index 3dd7a77..51c407e 100644 --- a/apps/decodex/src/agent/codex_accounts.rs +++ b/apps/decodex/src/agent/codex_accounts.rs @@ -9,6 +9,7 @@ use std::{ path::{Path, PathBuf}, process, sync::{Mutex, OnceLock}, + thread, time::Duration, }; @@ -32,6 +33,7 @@ const CHATGPT_OAUTH_CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann"; const HTTP_TIMEOUT: Duration = Duration::from_secs(10); const TOKEN_REFRESH_INTERVAL_SECONDS: i64 = 8 * 24 * 60 * 60; const ACCOUNT_ACTIVITY_CACHE_TTL_SECONDS: i64 = 60; +const ACCOUNT_ACTIVITY_PROBE_MAX_CONCURRENCY: usize = 8; static ACCOUNT_ACTIVITY_CACHE: OnceLock>> = OnceLock::new(); @@ -189,11 +191,17 @@ impl CodexAccountPool { } } - let summaries = self.account_activity_summaries()?; + let mut summaries = self.account_activity_summaries()?; let mut cached = cache .lock() .map_err(|error| eyre::eyre!("Codex account usage cache is poisoned: {error}"))?; + if let Some(entry) = cached.as_ref() + && entry.key == cache_key + { + preserve_cached_usage_windows(&mut summaries, &entry.summaries, now); + } + *cached = Some(AccountActivityCacheEntry { key: cache_key, checked_at_unix_epoch: now, @@ -456,86 +464,33 @@ impl CodexAccountPool { records: &mut [AccountPoolRecord], ) -> crate::prelude::Result> { let now = OffsetDateTime::now_utc().unix_timestamp(); - let mut summaries = Vec::new(); + let mut summaries_by_index = vec![None; records.len()]; + let mut probe_inputs = Vec::new(); let mut records_changed = false; - for record in records.iter_mut() { + for (index, record) in records.iter().enumerate() { let Some(configured_summary) = record.configured_activity_summary(now) else { continue; }; if configured_summary.status != "available" { - summaries.push(configured_summary); + summaries_by_index[index] = Some(configured_summary); continue; } - let refresh_status = match self.proactive_refresh_record(record, now) { - Ok(status) => { - if status == RefreshStatus::Succeeded { - records_changed = true; - } + probe_inputs.push(AccountActivityProbeInput { index, record: record.clone() }); + } + for chunk in probe_inputs.chunks(ACCOUNT_ACTIVITY_PROBE_MAX_CONCURRENCY) { + let results = self.probe_account_activity_summaries_parallel_chunk(chunk, now)?; - status.as_str() - }, - Err(error) if error.requires_skip => { - summaries.push(record.probe_failed_activity_summary( - now, - "failed", - &error.source, - )); - - continue; - }, - Err(_error) => "failed", - }; + for result in results { + if result.records_changed { + records[result.index] = result.record; + records_changed = true; + } - match self.probe_record_usage(record) { - Ok(usage) => { - summaries.push(self.activity_summary_from_usage_probe( - record, - usage, - refresh_status, - )?); - }, - Err(error) if error.unauthorized && record.refresh_token().is_some() => { - match self.refresh_record(record) { - Ok(()) => { - records_changed = true; - - match self.probe_record_usage(record) { - Ok(usage) => { - summaries.push(self.activity_summary_from_usage_probe( - record, - usage, - "succeeded", - )?); - }, - Err(retry_error) => { - summaries.push(record.probe_failed_activity_summary( - now, - "failed", - &retry_error, - )); - }, - } - }, - Err(refresh_error) => { - summaries.push(record.probe_failed_activity_summary( - now, - "failed", - refresh_error.as_ref(), - )); - }, - } - }, - Err(error) => { - summaries.push(record.probe_failed_activity_summary( - now, - "probe_failed", - &error, - )); - }, + summaries_by_index[result.index] = Some(result.summary); } } @@ -543,7 +498,83 @@ impl CodexAccountPool { self.save_records(records)?; } - Ok(summaries) + Ok(summaries_by_index.into_iter().flatten().collect()) + } + + fn probe_account_activity_summaries_parallel_chunk( + &self, + inputs: &[AccountActivityProbeInput], + now: i64, + ) -> crate::prelude::Result> { + thread::scope(|scope| { + let handles = inputs + .iter() + .cloned() + .map(|input| scope.spawn(move || self.probe_account_activity_record(input, now))) + .collect::>(); + let mut results = Vec::with_capacity(handles.len()); + + for handle in handles { + let result = handle + .join() + .map_err(|_| eyre::eyre!("Codex account usage probe worker panicked."))??; + + results.push(result); + } + + Ok(results) + }) + } + + fn probe_account_activity_record( + &self, + input: AccountActivityProbeInput, + now: i64, + ) -> crate::prelude::Result { + let mut record = input.record; + let mut records_changed = false; + let refresh_status = match self.proactive_refresh_record(&mut record, now) { + Ok(status) => { + if status == RefreshStatus::Succeeded { + records_changed = true; + } + + status.as_str() + }, + Err(error) if error.requires_skip => { + let summary = record.probe_failed_activity_summary(now, "failed", &error.source); + + return Ok(AccountActivityProbeResult { + index: input.index, + record, + summary, + records_changed, + }); + }, + Err(_error) => "failed", + }; + let summary = match self.probe_record_usage(&record) { + Ok(usage) => self.activity_summary_from_usage_probe(&record, usage, refresh_status)?, + Err(error) if error.unauthorized && record.refresh_token().is_some() => { + match self.refresh_record(&mut record) { + Ok(()) => { + records_changed = true; + + match self.probe_record_usage(&record) { + Ok(usage) => + self.activity_summary_from_usage_probe(&record, usage, "succeeded")?, + Err(retry_error) => + record.probe_failed_activity_summary(now, "failed", &retry_error), + } + }, + Err(refresh_error) => + record.probe_failed_activity_summary(now, "failed", refresh_error.as_ref()), + } + }, + Err(error) => record.probe_failed_activity_summary(now, "probe_failed", &error), + }; + + Ok(AccountActivityProbeResult { index: input.index, record, summary, records_changed }) } fn activity_summary_from_usage_probe( @@ -886,6 +917,19 @@ struct AccountActivityCacheEntry { summaries: Vec, } +#[derive(Clone)] +struct AccountActivityProbeInput { + index: usize, + record: AccountPoolRecord, +} + +struct AccountActivityProbeResult { + index: usize, + record: AccountPoolRecord, + summary: CodexAccountActivitySummary, + records_changed: bool, +} + struct AccountPoolFileLock { _file: File, } @@ -1548,15 +1592,108 @@ fn profile_daily_usage_from_value(value: &Value) -> Option) -> Option { let value = value.filter(|value| !value.is_null())?; let used_percent = number_as_i64(value.get("used_percent")?)?; + let window_seconds = value.get("limit_window_seconds").and_then(number_as_i64); + + if window_seconds.is_some_and(|seconds| seconds <= 0) { + return None; + } + let remaining_percent = 100_i64.saturating_sub(used_percent).clamp(0, 100); Some(UsageWindow { - window_seconds: value.get("limit_window_seconds").and_then(number_as_i64), + window_seconds, remaining_percent, resets_at_unix_epoch: value.get("reset_at").and_then(number_as_i64), }) } +fn preserve_cached_usage_windows( + summaries: &mut [CodexAccountActivitySummary], + cached_summaries: &[CodexAccountActivitySummary], + now_unix_epoch: i64, +) { + for summary in summaries { + if account_summary_is_limited(summary) { + continue; + } + + let Some(cached) = + cached_summaries.iter().find(|cached| account_summaries_match(summary, cached)) + else { + continue; + }; + + preserve_primary_usage_window(summary, cached, now_unix_epoch); + preserve_secondary_usage_window(summary, cached, now_unix_epoch); + } +} + +fn preserve_primary_usage_window( + summary: &mut CodexAccountActivitySummary, + cached: &CodexAccountActivitySummary, + now_unix_epoch: i64, +) { + if has_usage_window(summary.primary_window_seconds, summary.primary_remaining_percent) + || !has_current_usage_window( + cached.primary_window_seconds, + cached.primary_remaining_percent, + cached.primary_resets_at_unix_epoch, + now_unix_epoch, + ) { + return; + } + + summary.primary_window_seconds = cached.primary_window_seconds; + summary.primary_remaining_percent = cached.primary_remaining_percent; + summary.primary_resets_at_unix_epoch = cached.primary_resets_at_unix_epoch; +} + +fn preserve_secondary_usage_window( + summary: &mut CodexAccountActivitySummary, + cached: &CodexAccountActivitySummary, + now_unix_epoch: i64, +) { + if has_usage_window(summary.secondary_window_seconds, summary.secondary_remaining_percent) + || !has_current_usage_window( + cached.secondary_window_seconds, + cached.secondary_remaining_percent, + cached.secondary_resets_at_unix_epoch, + now_unix_epoch, + ) { + return; + } + + summary.secondary_window_seconds = cached.secondary_window_seconds; + summary.secondary_remaining_percent = cached.secondary_remaining_percent; + summary.secondary_resets_at_unix_epoch = cached.secondary_resets_at_unix_epoch; +} + +const fn has_usage_window(window_seconds: Option, remaining_percent: Option) -> bool { + matches!(window_seconds, Some(seconds) if seconds > 0) && remaining_percent.is_some() +} + +fn has_current_usage_window( + window_seconds: Option, + remaining_percent: Option, + resets_at_unix_epoch: Option, + now_unix_epoch: i64, +) -> bool { + has_usage_window(window_seconds, remaining_percent) + && resets_at_unix_epoch.is_some_and(|reset| reset > now_unix_epoch) +} + +fn account_summaries_match( + left: &CodexAccountActivitySummary, + right: &CodexAccountActivitySummary, +) -> bool { + left.account_fingerprint == right.account_fingerprint + || left + .email + .as_deref() + .zip(right.email.as_deref()) + .is_some_and(|(left, right)| left == right) +} + fn credits_from_value(value: &Value) -> Option { if value.is_null() { return None; @@ -2033,6 +2170,88 @@ mod tests { assert!(!available_summary.is_limited()); } + #[test] + fn usage_summary_ignores_zero_second_placeholder_windows() { + let payload = serde_json::json!({ + "plan_type": "pro", + "rate_limit": { + "primary_window": { + "used_percent": 0, + "limit_window_seconds": 0, + "reset_at": 1_800_000_000 + }, + "secondary_window": null + }, + "rate_limit_reached_type": null + }); + let summary = codex_accounts::usage_snapshot_from_payload(&payload, 1_800_000_000); + + assert_eq!(summary.primary, None); + assert_eq!(summary.secondary, None); + assert!(!summary.is_limited()); + + let record = AccountPoolRecord { + email: Some(String::from("placeholder@example.com")), + disabled: false, + cooldown_until_unix_epoch: None, + cooldown_until: None, + last_selected_at_unix_epoch: None, + auth_mode: Some(String::from("chatgpt")), + openai_api_key: None, + tokens: Some(CodexTokenData { + email: None, + id_token: None, + access_token: String::from("access"), + refresh_token: String::from("refresh"), + account_id: Some(String::from("acct_placeholder")), + }), + last_refresh: None, + }; + let login = record + .login_from_usage(summary, "not_needed") + .expect("placeholder usage should still produce an account summary"); + + assert_eq!(login.summary().status, "available"); + assert_eq!(login.summary().primary_window_seconds, None); + assert_eq!(login.summary().primary_remaining_percent, None); + assert_eq!(login.summary().primary_resets_at_unix_epoch, None); + } + + #[test] + fn usage_cache_preserves_current_windows_across_placeholder_refresh() { + let now = time::OffsetDateTime::now_utc().unix_timestamp(); + let cached = [CodexAccountActivitySummary { + account_fingerprint: String::from("...123456"), + email: Some(String::from("copy@example.com")), + status: String::from("available"), + primary_window_seconds: Some(18_000), + primary_remaining_percent: Some(72), + primary_resets_at_unix_epoch: Some(now + 18_000), + secondary_window_seconds: Some(604_800), + secondary_remaining_percent: Some(91), + secondary_resets_at_unix_epoch: Some(now + 604_800), + ..CodexAccountActivitySummary::default() + }]; + let mut refreshed = [CodexAccountActivitySummary { + account_fingerprint: String::from("...123456"), + email: Some(String::from("copy@example.com")), + status: String::from("available"), + checked_at_unix_epoch: Some(now + 60), + profile_lifetime_tokens: Some(47_200_000_000), + ..CodexAccountActivitySummary::default() + }]; + + codex_accounts::preserve_cached_usage_windows(&mut refreshed, &cached, now + 60); + + assert_eq!(refreshed[0].primary_window_seconds, Some(18_000)); + assert_eq!(refreshed[0].primary_remaining_percent, Some(72)); + assert_eq!(refreshed[0].primary_resets_at_unix_epoch, Some(now + 18_000)); + assert_eq!(refreshed[0].secondary_window_seconds, Some(604_800)); + assert_eq!(refreshed[0].secondary_remaining_percent, Some(91)); + assert_eq!(refreshed[0].secondary_resets_at_unix_epoch, Some(now + 604_800)); + assert_eq!(refreshed[0].profile_lifetime_tokens, Some(47_200_000_000)); + } + #[test] fn proactive_refresh_prefers_access_token_expiration_then_last_refresh() { let mut record = AccountPoolRecord { diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index d50ea63..487cb4e 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -557,10 +557,13 @@ struct StatusCommand { /// Maximum number of recent runs to display. #[arg(long, value_name = "COUNT", default_value_t = orchestrator::DEFAULT_STATUS_RUN_LIMIT)] limit: usize, + /// Refresh live tracker and pull-request observers before printing status. + #[arg(long)] + live: bool, } impl StatusCommand { fn run(&self) -> Result<()> { - orchestrator::print_status(self.project_config.as_path(), self.json, self.limit) + orchestrator::print_status(self.project_config.as_path(), self.json, self.limit, self.live) } } @@ -2098,6 +2101,7 @@ mod tests { project_config: ProjectConfigArgs { config: Some(config) }, json: true, limit: 5, + live: false, }) if config == Path::new("./project.toml") )); } diff --git a/apps/decodex/src/orchestrator/entrypoints.rs b/apps/decodex/src/orchestrator/entrypoints.rs index 9b51a9a..eebdfaa 100644 --- a/apps/decodex/src/orchestrator/entrypoints.rs +++ b/apps/decodex/src/orchestrator/entrypoints.rs @@ -232,6 +232,7 @@ pub(crate) fn print_status( config_path: Option<&Path>, json: bool, limit: usize, + live: bool, ) -> Result<()> { if limit == 0 { eyre::bail!("`status --limit` must be greater than zero."); @@ -248,102 +249,17 @@ pub(crate) fn print_status( runtime::register_project_config(&state_store, &config_path, true)?; - if let Some(status) = - active_stored_tracker_backoff_status(&state_store, config.service_id())? - { - let snapshot = - build_operator_status_snapshot_for_tracker_backoff(&config, &state_store, limit, &status)?; - - print_operator_status_snapshot(&snapshot, json)?; - - return Ok(()); - } - - let tracker = LinearClient::new(config.tracker().resolve_api_key()?)?; - let recovered_state = recover_runtime_state_from_tracker_and_worktrees( - &tracker, - &config, - &workflow, - &state_store, - ); - let mut snapshot_warnings = Vec::new(); - - match recovered_state { - Ok(recovered_state) => - hydrate_status_snapshot_state(&config, &state_store, recovered_state)?, - Err(error) => { - if let Some(backoff) = - tracker_rate_limit_backoff(&error, Instant::now(), "runtime_recovery") - { - let status = backoff.to_operator_status( - config.service_id(), - OffsetDateTime::now_utc().unix_timestamp(), - ); - - persist_tracker_backoff_state(&state_store, config.service_id(), &backoff); - - let snapshot = build_operator_status_snapshot_for_tracker_backoff( - &config, - &state_store, - limit, - &status, - )?; - - print_operator_status_snapshot(&snapshot, json)?; - - return Ok(()); - } - - let warning = runtime_recovery_warning("runtime_recovery_unavailable", &error); - - tracing::warn!( - recovery_error_class = runtime_recovery_error_class(&error), - "Skipped runtime recovery for operator status; sensitive runtime details were withheld." - ); - - snapshot_warnings.push(warning); - }, - } - - let mut snapshot = match build_live_operator_status_snapshot( - &tracker, - &config, - &workflow, - &state_store, - limit, - ) { - Ok(snapshot) => snapshot, - Err(error) => { - let Some(backoff) = - tracker_rate_limit_backoff(&error, Instant::now(), "operator_status_refresh") - else { - return Err(error); - }; - let status = backoff.to_operator_status( - config.service_id(), - OffsetDateTime::now_utc().unix_timestamp(), - ); - - persist_tracker_backoff_state(&state_store, config.service_id(), &backoff); - - build_operator_status_snapshot_for_tracker_backoff(&config, &state_store, limit, &status)? - }, + let snapshot = if live { + build_live_status_command_snapshot(&config, &workflow, &state_store, limit)? + } else { + build_operator_state_snapshot_without_live_observers( + &config, + &workflow, + &state_store, + limit, + )? }; - for warning in snapshot_warnings { - add_operator_snapshot_warning(&mut snapshot, &warning); - } - - refresh_operator_project_summary(&mut snapshot); - - if !snapshot - .connector_backoffs - .iter() - .any(|backoff| backoff.connector == "linear") - { - clear_tracker_backoff_state_best_effort(&state_store, config.service_id()); - } - print_operator_status_snapshot(&snapshot, json)?; Ok(()) @@ -440,6 +356,101 @@ pub(crate) fn print_private_evidence(request: EvidenceRequest<'_>) -> Result<()> Ok(()) } +fn build_live_status_command_snapshot( + config: &ServiceConfig, + workflow: &WorkflowDocument, + state_store: &StateStore, + limit: usize, +) -> Result { + if let Some(status) = active_stored_tracker_backoff_status(state_store, config.service_id())? { + return build_operator_status_snapshot_for_tracker_backoff( + config, + state_store, + limit, + &status, + ); + } + + let tracker = LinearClient::new(config.tracker().resolve_api_key()?)?; + let recovered_state = + recover_runtime_state_from_tracker_and_worktrees(&tracker, config, workflow, state_store); + let mut snapshot_warnings = Vec::new(); + + match recovered_state { + Ok(recovered_state) => + hydrate_status_snapshot_state(config, state_store, recovered_state)?, + Err(error) => { + if let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "runtime_recovery") + { + let status = backoff.to_operator_status( + config.service_id(), + OffsetDateTime::now_utc().unix_timestamp(), + ); + + persist_tracker_backoff_state(state_store, config.service_id(), &backoff); + + return build_operator_status_snapshot_for_tracker_backoff( + config, + state_store, + limit, + &status, + ); + } + + let warning = runtime_recovery_warning("runtime_recovery_unavailable", &error); + + tracing::warn!( + recovery_error_class = runtime_recovery_error_class(&error), + "Skipped runtime recovery for operator status; sensitive runtime details were withheld." + ); + + snapshot_warnings.push(warning); + }, + } + + let mut snapshot = match build_status_command_operator_status_snapshot( + &tracker, + config, + workflow, + state_store, + limit, + ) { + Ok(snapshot) => snapshot, + Err(error) => { + let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "operator_status_refresh") + else { + return Err(error); + }; + let status = backoff.to_operator_status( + config.service_id(), + OffsetDateTime::now_utc().unix_timestamp(), + ); + + persist_tracker_backoff_state(state_store, config.service_id(), &backoff); + + build_operator_status_snapshot_for_tracker_backoff(config, state_store, limit, &status)? + }, + }; + + for warning in snapshot_warnings { + add_operator_snapshot_warning(&mut snapshot, &warning); + } + + refresh_operator_project_summary(&mut snapshot); + + if !snapshot + .connector_backoffs + .iter() + .any(|backoff| backoff.connector == "linear") + { + clear_tracker_backoff_state_best_effort(state_store, config.service_id()); + } + + Ok(snapshot) +} + fn run_queue_explain( config: &ServiceConfig, workflow: &WorkflowDocument, diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index 67e7ae9..23ee9d1 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -440,6 +440,30 @@ where ) } +fn build_status_command_operator_status_snapshot( + tracker: &T, + project: &ServiceConfig, + workflow: &WorkflowDocument, + state_store: &StateStore, + limit: usize, +) -> crate::prelude::Result +where + T: IssueTracker, +{ + build_live_operator_status_snapshot_with_history_ledger( + tracker, + project, + workflow, + state_store, + limit, + LiveOperatorStatusSnapshotOptions { + hydrate_history_ledger: true, + run_issue_metadata_hydration: RunIssueMetadataHydration::AllRows, + account_activity_mode: AccountActivityMode::Snapshot, + }, + ) +} + fn build_control_plane_operator_status_snapshot( tracker: &T, project: &ServiceConfig, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index 969e1c7..fc3991f 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -409,8 +409,14 @@ fn idle_operator_status_snapshot_includes_configured_codex_accounts() { TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("home should be utf-8")); let accounts_path = temp_dir.path().join(".codex/decodex/accounts.jsonl"); let usage_endpoint = start_codex_usage_fixture_server(vec![ - r#"{"plan_type":"pro","rate_limit":{"primary_window":{"used_percent":7,"limit_window_seconds":18000,"reset_at":1800018000},"secondary_window":{"used_percent":11,"limit_window_seconds":604800,"reset_at":1800604800}},"credits":{"has_credits":true,"unlimited":false,"balance":"12.34"}}"#, - r#"{"plan_type":"plus","rate_limit":{"primary_window":{"used_percent":22,"limit_window_seconds":18000,"reset_at":1800019000},"secondary_window":{"used_percent":33,"limit_window_seconds":604800,"reset_at":1800605800}},"credits":{"has_credits":false,"unlimited":false,"balance":"0"}}"#, + ( + "acct_default", + r#"{"plan_type":"pro","rate_limit":{"primary_window":{"used_percent":7,"limit_window_seconds":18000,"reset_at":1800018000},"secondary_window":{"used_percent":11,"limit_window_seconds":604800,"reset_at":1800604800}},"credits":{"has_credits":true,"unlimited":false,"balance":"12.34"}}"#, + ), + ( + "acct_copy", + r#"{"plan_type":"plus","rate_limit":{"primary_window":{"used_percent":22,"limit_window_seconds":18000,"reset_at":1800019000},"secondary_window":{"used_percent":33,"limit_window_seconds":604800,"reset_at":1800605800}},"credits":{"has_credits":false,"unlimited":false,"balance":"0"}}"#, + ), ]); std::fs::create_dir_all(accounts_path.parent().expect("accounts path should have parent")) @@ -466,18 +472,82 @@ fn idle_operator_status_snapshot_includes_configured_codex_accounts() { assert_eq!(accounts[1]["credits_balance"], "0"); } -fn start_codex_usage_fixture_server(responses: Vec<&'static str>) -> String { +#[test] +fn status_command_snapshot_does_not_probe_configured_codex_accounts() { + let (temp_dir, base_config, workflow) = temp_project_layout(); + let _home_guard = + TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("home should be utf-8")); + let accounts_path = temp_dir.path().join(".codex/decodex/accounts.jsonl"); + + std::fs::create_dir_all(accounts_path.parent().expect("accounts path should have parent")) + .expect("accounts dir should exist"); + std::fs::write( + &accounts_path, + r#"{"email":"default@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-default","refresh_token":"refresh-default","account_id":"acct_default"}} +"#, + ) + .expect("accounts fixture should write"); + + let mut config_toml = service_config_toml_for_config( + &base_config, + base_config.github().token_env_var(), + base_config.codex().internal_review_mode(), + base_config.codex().external_review_enabled(), + ); + + config_toml.push_str( + "\n[codex.accounts]\nusage_endpoint = \"http://127.0.0.1:9/wham/usage\"\n", + ); + + write_service_config(base_config.repo_root(), &config_toml); + + let config = load_service_config(base_config.repo_root()); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let tracker = FakeTracker::new(Vec::new()); + let snapshot = orchestrator::build_status_command_operator_status_snapshot( + &tracker, + &config, + &workflow, + &state_store, + 10, + ) + .expect("status command snapshot should build without probing account usage"); + let snapshot_json = serde_json::to_value(&snapshot).expect("snapshot should serialize"); + let accounts = + snapshot_json["accounts"].as_array().expect("snapshot should expose configured accounts"); + + assert_eq!(accounts.len(), 1); + assert_eq!(accounts[0]["email"], "default@example.com"); + assert_eq!(accounts[0]["status"], "available"); + assert_eq!(accounts[0]["refresh_status"], "not_checked"); + assert_eq!(accounts[0]["primary_remaining_percent"], serde_json::Value::Null); + assert!(!snapshot.warnings.contains(&String::from("codex_accounts_unavailable"))); +} + +fn start_codex_usage_fixture_server(responses: Vec<(&'static str, &'static str)>) -> String { let listener = TcpListener::bind("127.0.0.1:0").expect("usage fixture server should bind"); let address = listener.local_addr().expect("usage fixture address should resolve"); thread::spawn(move || { - for body in responses { + let responses_by_account = + responses.into_iter().collect::>(); + let request_count = responses_by_account.len(); + + for _ in 0..request_count { let (mut stream, _peer) = listener.accept().expect("usage fixture request should arrive"); let mut request = [0_u8; 4_096]; - let _ = stream.read(&mut request); + let bytes_read = stream.read(&mut request).expect("usage request should read"); + let request = String::from_utf8_lossy(&request[..bytes_read]); + let account_id = usage_fixture_account_id(&request); + let (status, body) = match account_id.and_then(|account_id| { + responses_by_account.get(account_id).copied() + }) { + Some(body) => ("200 OK", body), + None => ("404 Not Found", r#"{"error":"unknown account"}"#), + }; let response = format!( - "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + "HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", body.len(), body ); @@ -493,6 +563,14 @@ fn start_codex_usage_fixture_server(responses: Vec<&'static str>) -> String { format!("http://{address}/wham/usage") } +fn usage_fixture_account_id(request: &str) -> Option<&str> { + request.lines().find_map(|line| { + let (name, value) = line.split_once(':')?; + + name.eq_ignore_ascii_case("ChatGPT-Account-Id").then_some(value.trim()) + }) +} + #[test] fn operator_status_snapshot_includes_local_recovery_worktree_directories() { let (_temp_dir, config, _workflow) = temp_project_layout(); diff --git a/plugins/decodex/skills/manual-cli/SKILL.md b/plugins/decodex/skills/manual-cli/SKILL.md index 52c52c3..c5bf0fa 100644 --- a/plugins/decodex/skills/manual-cli/SKILL.md +++ b/plugins/decodex/skills/manual-cli/SKILL.md @@ -35,6 +35,7 @@ decodex probe stdio:// decodex project add "$HOME/.codex/decodex/projects/" decodex project list decodex status +decodex status --live decodex run --dry-run decodex archive-linear --repo-label repo: --older-than-days 30 ``` @@ -45,6 +46,7 @@ Development equivalents from the Decodex repo root: cargo run -p decodex --bin decodex -- probe stdio:// cargo run -p decodex --bin decodex -- project add "$HOME/.codex/decodex/projects/" cargo run -p decodex --bin decodex -- status +cargo run -p decodex --bin decodex -- status --live cargo run -p decodex --bin decodex -- run --dry-run cargo run -p decodex --bin decodex -- archive-linear --repo-label repo: --older-than-days 30 ``` @@ -126,8 +128,11 @@ Manual commit and landing are separate narrow workflows: ## Status and Dry Run -- Use `status` to inspect active lanes, queue state, review and landing state, recovery - worktrees, and the run ledger. +- Use `status` to inspect the local runtime snapshot for active lanes, retained local + state, recovery worktrees, account-pool configuration, and the run ledger without + refreshing live tracker, pull-request, or ChatGPT account usage observers. +- Use `status --live` when the operator needs fresh Linear/GitHub observer readback + before acting. Use `/api/accounts?refresh=1` for fresh ChatGPT account usage probes. - Use `run --dry-run` before live automation to validate project loading, issue discovery, eligibility, and worktree planning without tracker mutation. - Use `probe stdio://` before relying on the Codex app-server boundary.