Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 211 additions & 85 deletions apps/decodex/src/agent/codex_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cmp::Ordering,
error::Error,
fmt::{self, Display, Formatter},
fs,
fs::{self, File, OpenOptions},
path::{Path, PathBuf},
process,
sync::Mutex,
Expand Down Expand Up @@ -85,11 +85,42 @@ impl CodexAccountPool {
pub(crate) fn account_activity_summaries(
&self,
) -> crate::prelude::Result<Vec<CodexAccountActivitySummary>> {
let _guard = self.lock_records()?;
let mut records = self.load_records()?;

self.probe_account_activity_summaries(&mut records)
}

fn lock_records(&self) -> crate::prelude::Result<AccountPoolFileLock> {
let parent = self.path.parent().ok_or_else(|| {
eyre::eyre!(
"Codex accounts path `{}` must have a parent directory.",
self.path.display()
)
})?;
let file_name = self
.path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| eyre::eyre!("Codex accounts path must end in a valid file name."))?;
let lock_path = parent.join(format!(".{file_name}.lock"));
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.map_err(|error| {
eyre::eyre!("Failed to open Codex accounts lock `{}`: {error}", lock_path.display())
})?;

file.lock().map_err(|error| {
eyre::eyre!("Failed to lock Codex accounts `{}`: {error}", self.path.display())
})?;

Ok(AccountPoolFileLock { _file: file })
}

fn save_records(&self, records: &[AccountPoolRecord]) -> crate::prelude::Result<()> {
let parent = self.path.parent().ok_or_else(|| {
eyre::eyre!(
Expand Down Expand Up @@ -568,6 +599,7 @@ impl CodexAccountPool {

impl CodexAccountProvider for CodexAccountPool {
fn select_account(&self) -> crate::prelude::Result<CodexAccountLogin> {
let _guard = self.lock_records()?;
let mut records = self.load_records()?;

self.select_from_records(&mut records)
Expand All @@ -577,6 +609,7 @@ impl CodexAccountProvider for CodexAccountPool {
&self,
previous_account_id: Option<&str>,
) -> crate::prelude::Result<CodexAccountLogin> {
let _guard = self.lock_records()?;
let mut records = self.load_records()?;

self.refresh_from_records(&mut records, previous_account_id)
Expand Down Expand Up @@ -630,48 +663,8 @@ impl CodexAccountLogin {
}
}

#[derive(Clone, Deserialize, Serialize)]
#[serde(untagged)]
enum AccountPoolLine {
Wrapped {
#[serde(skip_serializing_if = "Option::is_none")]
email: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
disabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
cooldown_until_unix_epoch: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
cooldown_until: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
last_selected_at_unix_epoch: Option<i64>,
auth: AuthDotJson,
},
Flat(AccountPoolRecord),
}
impl AccountPoolLine {
fn into_record(self) -> AccountPoolRecord {
match self {
Self::Flat(record) => record,
Self::Wrapped {
email,
disabled,
cooldown_until_unix_epoch,
cooldown_until,
last_selected_at_unix_epoch,
auth,
} => AccountPoolRecord {
email: first_nonblank_string(email, auth.email),
disabled,
cooldown_until_unix_epoch,
cooldown_until,
last_selected_at_unix_epoch,
auth_mode: auth.auth_mode,
openai_api_key: auth.openai_api_key,
tokens: auth.tokens,
last_refresh: auth.last_refresh,
},
}
}
struct AccountPoolFileLock {
_file: File,
}

#[derive(Clone, Deserialize, Serialize)]
Expand Down Expand Up @@ -922,41 +915,6 @@ struct RefreshResponse {
refresh_token: Option<String>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum RefreshStatus {
NotNeeded,
Succeeded,
Failed,
}
impl RefreshStatus {
const fn as_str(self) -> &'static str {
match self {
Self::NotNeeded => "not_needed",
Self::Succeeded => "succeeded",
Self::Failed => "failed",
}
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ProactiveRefreshReason {
AccessTokenExpired,
LastRefreshStale,
}
impl ProactiveRefreshReason {
const fn requires_valid_token(self) -> bool {
matches!(self, Self::AccessTokenExpired)
}
}
impl Display for ProactiveRefreshReason {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::AccessTokenExpired => formatter.write_str("expired access token"),
Self::LastRefreshStale => formatter.write_str("stale refresh timestamp"),
}
}
}

#[derive(Debug)]
struct ProactiveRefreshError {
source: ReportableRefreshError,
Expand All @@ -972,11 +930,13 @@ impl ReportableRefreshError {
Self { message }
}
}

impl Display for ReportableRefreshError {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.write_str(&self.message)
}
}

impl Error for ReportableRefreshError {}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
Expand Down Expand Up @@ -1029,13 +989,104 @@ impl UsageProbeError {
Self { unauthorized: false, message: message.into() }
}
}

impl Display for UsageProbeError {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.write_str(&self.message)
}
}

impl Error for UsageProbeError {}

#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
struct AccountCandidateScore {
not_limited: bool,
bottleneck_remaining_percent: i64,
combined_remaining_score: i64,
primary_remaining_percent: i64,
secondary_remaining_percent: i64,
}

#[derive(Clone, Deserialize, Serialize)]
#[serde(untagged)]
enum AccountPoolLine {
Wrapped {
#[serde(skip_serializing_if = "Option::is_none")]
email: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
disabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
cooldown_until_unix_epoch: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
cooldown_until: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
last_selected_at_unix_epoch: Option<i64>,
auth: AuthDotJson,
},
Flat(AccountPoolRecord),
}
impl AccountPoolLine {
fn into_record(self) -> AccountPoolRecord {
match self {
Self::Flat(record) => record,
Self::Wrapped {
email,
disabled,
cooldown_until_unix_epoch,
cooldown_until,
last_selected_at_unix_epoch,
auth,
} => AccountPoolRecord {
email: first_nonblank_string(email, auth.email),
disabled,
cooldown_until_unix_epoch,
cooldown_until,
last_selected_at_unix_epoch,
auth_mode: auth.auth_mode,
openai_api_key: auth.openai_api_key,
tokens: auth.tokens,
last_refresh: auth.last_refresh,
},
}
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum RefreshStatus {
NotNeeded,
Succeeded,
Failed,
}
impl RefreshStatus {
const fn as_str(self) -> &'static str {
match self {
Self::NotNeeded => "not_needed",
Self::Succeeded => "succeeded",
Self::Failed => "failed",
}
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ProactiveRefreshReason {
AccessTokenExpired,
LastRefreshStale,
}
impl ProactiveRefreshReason {
const fn requires_valid_token(self) -> bool {
matches!(self, Self::AccessTokenExpired)
}
}

impl Display for ProactiveRefreshReason {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::AccessTokenExpired => formatter.write_str("expired access token"),
Self::LastRefreshStale => formatter.write_str("stale refresh timestamp"),
}
}
}

fn parse_account_records(
input: &str,
path: &Path,
Expand Down Expand Up @@ -1205,17 +1256,20 @@ fn compare_account_candidates(left: &CodexAccountLogin, right: &CodexAccountLogi
.then_with(|| left.summary.account_fingerprint.cmp(&right.summary.account_fingerprint))
}

fn account_candidate_score(candidate: &CodexAccountLogin) -> i64 {
fn account_candidate_score(candidate: &CodexAccountLogin) -> AccountCandidateScore {
let summary = candidate.summary();
let primary = summary.primary_remaining_percent.unwrap_or(0);
let primary = summary
.primary_remaining_percent
.unwrap_or_else(|| summary.secondary_remaining_percent.unwrap_or(0));
let secondary = summary.secondary_remaining_percent.unwrap_or(primary);
let mut score = primary.saturating_mul(1_000).saturating_add(secondary.saturating_mul(10));

if account_summary_is_limited(summary) {
score = score.saturating_sub(200_000);
AccountCandidateScore {
not_limited: !account_summary_is_limited(summary),
bottleneck_remaining_percent: primary.min(secondary),
combined_remaining_score: primary.saturating_mul(secondary),
primary_remaining_percent: primary,
secondary_remaining_percent: secondary,
}

score
}

fn account_summary_is_limited(summary: &CodexAccountActivitySummary) -> bool {
Expand Down Expand Up @@ -1554,6 +1608,18 @@ mod tests {
assert_eq!(candidates[0].account_id(), "acct_b");
}

#[test]
fn account_candidate_sort_balances_primary_and_secondary_windows() {
let mut candidates = [
codex_account_login_for_sort("acct_primary_rich", Some(100), Some(40), None),
codex_account_login_for_sort("acct_balanced", Some(80), Some(80), None),
];

candidates.sort_by(compare_account_candidates);

assert_eq!(candidates[0].account_id(), "acct_balanced");
}

#[test]
fn account_candidate_sort_does_not_penalize_zero_credits_when_windows_available() {
let mut candidates = [
Expand Down Expand Up @@ -1630,6 +1696,66 @@ mod tests {
assert_eq!(candidates[0].account_id(), "acct_b");
}

#[test]
fn account_pool_rotates_equal_full_usage_across_dispatches() {
const FULL_USAGE: &str = r#"{"plan_type":"pro","rate_limit":{"primary_window":{"used_percent":0},"secondary_window":{"used_percent":0}}}"#;

let temp_dir = TempDir::new().expect("temp dir should exist");
let accounts_path = temp_dir.path().join("accounts.jsonl");
let usage_endpoint = start_codex_usage_fixture_server(vec![
FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE, FULL_USAGE,
FULL_USAGE, FULL_USAGE,
]);

fs::write(
&accounts_path,
r#"{"email":"a@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-a","refresh_token":"refresh-a","account_id":"acct_a"}}
{"email":"b@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-b","refresh_token":"refresh-b","account_id":"acct_b"}}
{"email":"c@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-c","refresh_token":"refresh-c","account_id":"acct_c"}}
"#,
)
.expect("accounts fixture should write");

let pool = CodexAccountPool::new_with_fixed_account(
&accounts_path,
usage_endpoint,
DEFAULT_REFRESH_ENDPOINT,
None,
)
.expect("account pool should initialize");
let selected = (0..3)
.map(|_| {
pool.select_account()
.expect("full usage account should select")
.account_id()
.to_owned()
})
.collect::<Vec<_>>();

assert_eq!(selected, vec!["acct_a", "acct_b", "acct_c"]);
}

fn codex_account_login_for_sort(
account_id: &str,
primary_remaining_percent: Option<i64>,
secondary_remaining_percent: Option<i64>,
last_selected_at_unix_epoch: Option<i64>,
) -> CodexAccountLogin {
CodexAccountLogin {
access_token: String::from("access"),
account_id: account_id.to_owned(),
plan_type: Some(String::from("pro")),
last_selected_at_unix_epoch,
summary: CodexAccountActivitySummary {
account_fingerprint: format!("...{account_id}"),
primary_remaining_percent,
secondary_remaining_percent,
..CodexAccountActivitySummary::default()
},
account_summaries: Vec::new(),
}
}

fn start_codex_usage_fixture_server(responses: Vec<&'static str>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("usage fixture server should bind");
let address = listener.local_addr().expect("usage fixture address should resolve");
Expand Down
Loading