From 399e058db3c848d231e0b5dd29350f0cc27ea7f3 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Mon, 18 May 2026 15:32:08 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Balance Codex account usage windows","authority":"manual"} --- apps/decodex/src/agent/codex_accounts.rs | 296 ++++++++++++++++------- docs/spec/app-server.md | 9 +- 2 files changed, 217 insertions(+), 88 deletions(-) diff --git a/apps/decodex/src/agent/codex_accounts.rs b/apps/decodex/src/agent/codex_accounts.rs index 0b0a791..65d2424 100644 --- a/apps/decodex/src/agent/codex_accounts.rs +++ b/apps/decodex/src/agent/codex_accounts.rs @@ -2,7 +2,7 @@ use std::{ cmp::Ordering, error::Error, fmt::{self, Display, Formatter}, - fs, + fs::{self, File, OpenOptions}, path::{Path, PathBuf}, process, sync::Mutex, @@ -85,11 +85,42 @@ impl CodexAccountPool { pub(crate) fn account_activity_summaries( &self, ) -> crate::prelude::Result> { + let _guard = self.lock_records()?; let mut records = self.load_records()?; self.probe_account_activity_summaries(&mut records) } + fn lock_records(&self) -> crate::prelude::Result { + let parent = self.path.parent().ok_or_else(|| { + eyre::eyre!( + "Codex accounts path `{}` must have a parent directory.", + self.path.display() + ) + })?; + let file_name = self + .path + .file_name() + .and_then(|name| name.to_str()) + .ok_or_else(|| eyre::eyre!("Codex accounts path must end in a valid file name."))?; + let lock_path = parent.join(format!(".{file_name}.lock")); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&lock_path) + .map_err(|error| { + eyre::eyre!("Failed to open Codex accounts lock `{}`: {error}", lock_path.display()) + })?; + + file.lock().map_err(|error| { + eyre::eyre!("Failed to lock Codex accounts `{}`: {error}", self.path.display()) + })?; + + Ok(AccountPoolFileLock { _file: file }) + } + fn save_records(&self, records: &[AccountPoolRecord]) -> crate::prelude::Result<()> { let parent = self.path.parent().ok_or_else(|| { eyre::eyre!( @@ -568,6 +599,7 @@ impl CodexAccountPool { impl CodexAccountProvider for CodexAccountPool { fn select_account(&self) -> crate::prelude::Result { + let _guard = self.lock_records()?; let mut records = self.load_records()?; self.select_from_records(&mut records) @@ -577,6 +609,7 @@ impl CodexAccountProvider for CodexAccountPool { &self, previous_account_id: Option<&str>, ) -> crate::prelude::Result { + let _guard = self.lock_records()?; let mut records = self.load_records()?; self.refresh_from_records(&mut records, previous_account_id) @@ -630,48 +663,8 @@ impl CodexAccountLogin { } } -#[derive(Clone, Deserialize, Serialize)] -#[serde(untagged)] -enum AccountPoolLine { - Wrapped { - #[serde(skip_serializing_if = "Option::is_none")] - email: Option, - #[serde(default, skip_serializing_if = "is_false")] - disabled: bool, - #[serde(skip_serializing_if = "Option::is_none")] - cooldown_until_unix_epoch: Option, - #[serde(skip_serializing_if = "Option::is_none")] - cooldown_until: Option, - #[serde(skip_serializing_if = "Option::is_none")] - last_selected_at_unix_epoch: Option, - auth: AuthDotJson, - }, - Flat(AccountPoolRecord), -} -impl AccountPoolLine { - fn into_record(self) -> AccountPoolRecord { - match self { - Self::Flat(record) => record, - Self::Wrapped { - email, - disabled, - cooldown_until_unix_epoch, - cooldown_until, - last_selected_at_unix_epoch, - auth, - } => AccountPoolRecord { - email: first_nonblank_string(email, auth.email), - disabled, - cooldown_until_unix_epoch, - cooldown_until, - last_selected_at_unix_epoch, - auth_mode: auth.auth_mode, - openai_api_key: auth.openai_api_key, - tokens: auth.tokens, - last_refresh: auth.last_refresh, - }, - } - } +struct AccountPoolFileLock { + _file: File, } #[derive(Clone, Deserialize, Serialize)] @@ -922,41 +915,6 @@ struct RefreshResponse { refresh_token: Option, } -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum RefreshStatus { - NotNeeded, - Succeeded, - Failed, -} -impl RefreshStatus { - const fn as_str(self) -> &'static str { - match self { - Self::NotNeeded => "not_needed", - Self::Succeeded => "succeeded", - Self::Failed => "failed", - } - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum ProactiveRefreshReason { - AccessTokenExpired, - LastRefreshStale, -} -impl ProactiveRefreshReason { - const fn requires_valid_token(self) -> bool { - matches!(self, Self::AccessTokenExpired) - } -} -impl Display for ProactiveRefreshReason { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::AccessTokenExpired => formatter.write_str("expired access token"), - Self::LastRefreshStale => formatter.write_str("stale refresh timestamp"), - } - } -} - #[derive(Debug)] struct ProactiveRefreshError { source: ReportableRefreshError, @@ -972,11 +930,13 @@ impl ReportableRefreshError { Self { message } } } + impl Display for ReportableRefreshError { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { formatter.write_str(&self.message) } } + impl Error for ReportableRefreshError {} #[derive(Clone, Debug, Default, Eq, PartialEq)] @@ -1029,13 +989,104 @@ impl UsageProbeError { Self { unauthorized: false, message: message.into() } } } + impl Display for UsageProbeError { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { formatter.write_str(&self.message) } } + impl Error for UsageProbeError {} +#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] +struct AccountCandidateScore { + not_limited: bool, + bottleneck_remaining_percent: i64, + combined_remaining_score: i64, + primary_remaining_percent: i64, + secondary_remaining_percent: i64, +} + +#[derive(Clone, Deserialize, Serialize)] +#[serde(untagged)] +enum AccountPoolLine { + Wrapped { + #[serde(skip_serializing_if = "Option::is_none")] + email: Option, + #[serde(default, skip_serializing_if = "is_false")] + disabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + cooldown_until_unix_epoch: Option, + #[serde(skip_serializing_if = "Option::is_none")] + cooldown_until: Option, + #[serde(skip_serializing_if = "Option::is_none")] + last_selected_at_unix_epoch: Option, + auth: AuthDotJson, + }, + Flat(AccountPoolRecord), +} +impl AccountPoolLine { + fn into_record(self) -> AccountPoolRecord { + match self { + Self::Flat(record) => record, + Self::Wrapped { + email, + disabled, + cooldown_until_unix_epoch, + cooldown_until, + last_selected_at_unix_epoch, + auth, + } => AccountPoolRecord { + email: first_nonblank_string(email, auth.email), + disabled, + cooldown_until_unix_epoch, + cooldown_until, + last_selected_at_unix_epoch, + auth_mode: auth.auth_mode, + openai_api_key: auth.openai_api_key, + tokens: auth.tokens, + last_refresh: auth.last_refresh, + }, + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum RefreshStatus { + NotNeeded, + Succeeded, + Failed, +} +impl RefreshStatus { + const fn as_str(self) -> &'static str { + match self { + Self::NotNeeded => "not_needed", + Self::Succeeded => "succeeded", + Self::Failed => "failed", + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum ProactiveRefreshReason { + AccessTokenExpired, + LastRefreshStale, +} +impl ProactiveRefreshReason { + const fn requires_valid_token(self) -> bool { + matches!(self, Self::AccessTokenExpired) + } +} + +impl Display for ProactiveRefreshReason { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + match self { + Self::AccessTokenExpired => formatter.write_str("expired access token"), + Self::LastRefreshStale => formatter.write_str("stale refresh timestamp"), + } + } +} + fn parse_account_records( input: &str, path: &Path, @@ -1205,17 +1256,20 @@ fn compare_account_candidates(left: &CodexAccountLogin, right: &CodexAccountLogi .then_with(|| left.summary.account_fingerprint.cmp(&right.summary.account_fingerprint)) } -fn account_candidate_score(candidate: &CodexAccountLogin) -> i64 { +fn account_candidate_score(candidate: &CodexAccountLogin) -> AccountCandidateScore { let summary = candidate.summary(); - let primary = summary.primary_remaining_percent.unwrap_or(0); + let primary = summary + .primary_remaining_percent + .unwrap_or_else(|| summary.secondary_remaining_percent.unwrap_or(0)); let secondary = summary.secondary_remaining_percent.unwrap_or(primary); - let mut score = primary.saturating_mul(1_000).saturating_add(secondary.saturating_mul(10)); - if account_summary_is_limited(summary) { - score = score.saturating_sub(200_000); + AccountCandidateScore { + not_limited: !account_summary_is_limited(summary), + bottleneck_remaining_percent: primary.min(secondary), + combined_remaining_score: primary.saturating_mul(secondary), + primary_remaining_percent: primary, + secondary_remaining_percent: secondary, } - - score } fn account_summary_is_limited(summary: &CodexAccountActivitySummary) -> bool { @@ -1554,6 +1608,18 @@ mod tests { assert_eq!(candidates[0].account_id(), "acct_b"); } + #[test] + fn account_candidate_sort_balances_primary_and_secondary_windows() { + let mut candidates = [ + codex_account_login_for_sort("acct_primary_rich", Some(100), Some(40), None), + codex_account_login_for_sort("acct_balanced", Some(80), Some(80), None), + ]; + + candidates.sort_by(compare_account_candidates); + + assert_eq!(candidates[0].account_id(), "acct_balanced"); + } + #[test] fn account_candidate_sort_does_not_penalize_zero_credits_when_windows_available() { let mut candidates = [ @@ -1630,6 +1696,66 @@ mod tests { assert_eq!(candidates[0].account_id(), "acct_b"); } + #[test] + fn account_pool_rotates_equal_full_usage_across_dispatches() { + const FULL_USAGE: &str = r#"{"plan_type":"pro","rate_limit":{"primary_window":{"used_percent":0},"secondary_window":{"used_percent":0}}}"#; + + let temp_dir = TempDir::new().expect("temp dir should exist"); + let accounts_path = temp_dir.path().join("accounts.jsonl"); + let usage_endpoint = start_codex_usage_fixture_server(vec![ + FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, + FULL_USAGE, FULL_USAGE, + ]); + + fs::write( + &accounts_path, + r#"{"email":"a@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-a","refresh_token":"refresh-a","account_id":"acct_a"}} +{"email":"b@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-b","refresh_token":"refresh-b","account_id":"acct_b"}} +{"email":"c@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-c","refresh_token":"refresh-c","account_id":"acct_c"}} +"#, + ) + .expect("accounts fixture should write"); + + let pool = CodexAccountPool::new_with_fixed_account( + &accounts_path, + usage_endpoint, + DEFAULT_REFRESH_ENDPOINT, + None, + ) + .expect("account pool should initialize"); + let selected = (0..3) + .map(|_| { + pool.select_account() + .expect("full usage account should select") + .account_id() + .to_owned() + }) + .collect::>(); + + assert_eq!(selected, vec!["acct_a", "acct_b", "acct_c"]); + } + + fn codex_account_login_for_sort( + account_id: &str, + primary_remaining_percent: Option, + secondary_remaining_percent: Option, + last_selected_at_unix_epoch: Option, + ) -> CodexAccountLogin { + CodexAccountLogin { + access_token: String::from("access"), + account_id: account_id.to_owned(), + plan_type: Some(String::from("pro")), + last_selected_at_unix_epoch, + summary: CodexAccountActivitySummary { + account_fingerprint: format!("...{account_id}"), + primary_remaining_percent, + secondary_remaining_percent, + ..CodexAccountActivitySummary::default() + }, + account_summaries: Vec::new(), + } + } + fn start_codex_usage_fixture_server(responses: Vec<&'static str>) -> String { let listener = TcpListener::bind("127.0.0.1:0").expect("usage fixture server should bind"); let address = listener.local_addr().expect("usage fixture address should resolve"); diff --git a/docs/spec/app-server.md b/docs/spec/app-server.md index bb72d75..35803d9 100644 --- a/docs/spec/app-server.md +++ b/docs/spec/app-server.md @@ -119,14 +119,17 @@ override. The pool accepts flat `auth.json`-style JSONL records or records wrapp `{ "auth": ... }`. Before login, Decodex probes configured accounts through the ChatGPT usage endpoint. By default, it skips disabled, cooling-down, and incomplete records, penalizes -usage-limited records, prefers the highest remaining usage, and uses the -least-recently selected account to break equal usage scores. If the global +usage-limited records, scores both the short primary window and the longer secondary +window, prefers the account with the strongest remaining bottleneck capacity, and uses +the least-recently selected account to break equal capacity scores. If the global `~/.codex/decodex/config.toml` sets `[codex.accounts].fixed_account`, Decodex only considers the matching account instead of balancing across the pool. The selector matches an account email, full account id, or redacted account fingerprint as displayed in the operator UI. Project configs can enable account-pool use, but they do not own a project-scoped fixed account. -Successful selection writes `last_selected_at_unix_epoch` back to the JSONL file. +Successful selection writes `last_selected_at_unix_epoch` back to the JSONL file, and +selection holds a pool-local lock so concurrent run dispatches observe the latest +selector state instead of all choosing from the same stale snapshot. Decodex owns token freshness for injected `chatgptAuthTokens`. It proactively refreshes an account before probing when the access-token JWT `exp` is expired. If no expiration