Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions apps/decodex/src/agent/codex_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,30 @@ impl CodexAccountPool {
Ok(summaries)
}

pub(crate) fn account_activity_summaries_snapshot(
&self,
) -> crate::prelude::Result<Vec<CodexAccountActivitySummary>> {
let now = OffsetDateTime::now_utc().unix_timestamp();
let cache_key = self.cache_key();
let cache = ACCOUNT_ACTIVITY_CACHE.get_or_init(|| Mutex::new(None));
let cached = cache
.lock()
.map_err(|error| eyre::eyre!("Codex account usage cache is poisoned: {error}"))?;

if let Some(entry) = cached.as_ref()
&& entry.key == cache_key
{
return Ok(entry.summaries.clone());
}

drop(cached);

let _guard = self.lock_records()?;
let records = self.load_records()?;

Ok(records.iter().filter_map(|record| record.configured_activity_summary(now)).collect())
}

fn cache_key(&self) -> AccountActivityCacheKey {
AccountActivityCacheKey {
path: self.path.clone(),
Expand Down Expand Up @@ -1457,6 +1481,34 @@ mod tests {
assert_eq!(account.account_summaries().len(), 1);
}

#[test]
fn account_activity_snapshot_uses_configured_records_without_usage_probe() {
let temp_dir = TempDir::new().expect("temp dir should exist");
let accounts_path = temp_dir.path().join("accounts.jsonl");

fs::write(
&accounts_path,
r#"{"email":"snapshot@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-snapshot","refresh_token":"refresh-snapshot","account_id":"acct_snapshot"}}"#,
)
.expect("accounts fixture should write");

let pool = CodexAccountPool::new_with_fixed_account(
&accounts_path,
"http://127.0.0.1:9/usage",
DEFAULT_REFRESH_ENDPOINT,
None,
)
.expect("account pool should initialize");
let summaries = pool.account_activity_summaries_snapshot().expect("snapshot should load");

assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].email.as_deref(), Some("snapshot@example.com"));
assert_eq!(summaries[0].status, "available");
assert_eq!(summaries[0].refresh_status, "not_checked");
assert_eq!(summaries[0].primary_remaining_percent, None);
assert_eq!(summaries[0].note.as_deref(), Some("configured account"));
}

#[test]
fn usage_summary_parses_codex_rate_limit_payload() {
let payload = serde_json::json!({
Expand Down
7 changes: 6 additions & 1 deletion apps/decodex/src/orchestrator/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ where
let mut snapshot = if warnings.is_empty() {
build_control_plane_operator_status_snapshot(tracker, project, workflow, state_store, limit)?
} else {
build_operator_status_snapshot(project, state_store, limit)?
build_operator_status_snapshot_with_account_mode(
project,
state_store,
limit,
AccountActivityMode::Snapshot,
)?
};

if !warnings.is_empty() {
Expand Down
6 changes: 5 additions & 1 deletion apps/decodex/src/orchestrator/operator_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,11 @@ fn build_operator_run_activity_event(state_store: &StateStore) -> Result<Dashboa

let mut account_warnings = Vec::new();

accounts.extend(codex_account_activity_summaries(&project, &mut account_warnings));
accounts.extend(codex_account_activity_summaries(
&project,
&mut account_warnings,
AccountActivityMode::Snapshot,
));
active_runs.extend(project_active_runs);
}

Expand Down
41 changes: 36 additions & 5 deletions apps/decodex/src/orchestrator/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ struct WorktreeOwnership {
reason: String,
}

#[derive(Clone, Copy)]
enum AccountActivityMode {
Probe,
Snapshot,
}

pub(crate) fn ensure_project_has_no_merged_worktree_cleanup_debt(
project: &ServiceConfig,
) -> crate::prelude::Result<()> {
Expand All @@ -143,6 +149,20 @@ fn build_operator_status_snapshot(
project: &ServiceConfig,
state_store: &StateStore,
limit: usize,
) -> crate::prelude::Result<OperatorStatusSnapshot> {
build_operator_status_snapshot_with_account_mode(
project,
state_store,
limit,
AccountActivityMode::Probe,
)
}

fn build_operator_status_snapshot_with_account_mode(
project: &ServiceConfig,
state_store: &StateStore,
limit: usize,
account_activity_mode: AccountActivityMode,
) -> crate::prelude::Result<OperatorStatusSnapshot> {
let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp();
let (active_runs, recent_runs) = state_store.list_project_runs(project.service_id(), limit)?;
Expand Down Expand Up @@ -170,7 +190,7 @@ fn build_operator_status_snapshot(

let history_lanes = operator_history_lanes(&active_runs, &recent_runs);
let (worktrees, mut warnings) = operator_status_worktrees(project, state_store)?;
let accounts = codex_account_activity_summaries(project, &mut warnings);
let accounts = codex_account_activity_summaries(project, &mut warnings, account_activity_mode);
let mut snapshot = OperatorStatusSnapshot {
project_id: project.service_id().to_owned(),
run_limit: limit,
Expand Down Expand Up @@ -238,6 +258,7 @@ where
state_store,
limit,
true,
AccountActivityMode::Probe,
)
}

Expand All @@ -258,6 +279,7 @@ where
state_store,
limit,
false,
AccountActivityMode::Snapshot,
)
}

Expand All @@ -268,6 +290,7 @@ fn build_live_operator_status_snapshot_with_history_ledger<T>(
state_store: &StateStore,
limit: usize,
hydrate_history_ledger: bool,
account_activity_mode: AccountActivityMode,
) -> crate::prelude::Result<OperatorStatusSnapshot>
where
T: IssueTracker,
Expand All @@ -281,7 +304,12 @@ where
let review_state_inspector = GhPullRequestReviewStateInspector {
github_token_env_var: Some(project.github().token_env_var().to_owned()),
};
let mut snapshot = build_operator_status_snapshot(project, state_store, limit)?;
let mut snapshot = build_operator_status_snapshot_with_account_mode(
project,
state_store,
limit,
account_activity_mode,
)?;

hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?;

Expand Down Expand Up @@ -898,14 +926,17 @@ fn format_merged_worktree_cleanup_debts(
fn codex_account_activity_summaries(
project: &ServiceConfig,
warnings: &mut Vec<String>,
mode: AccountActivityMode,
) -> Vec<CodexAccountActivitySummary> {
let Some(accounts_config) = project.codex().accounts() else {
return Vec::new();
};
let accounts = CodexAccountPool::from_config(accounts_config).and_then(|pool| match mode {
AccountActivityMode::Probe => pool.account_activity_summaries_cached(false),
AccountActivityMode::Snapshot => pool.account_activity_summaries_snapshot(),
});

match CodexAccountPool::from_config(accounts_config)
.and_then(|pool| pool.account_activity_summaries_cached(false))
{
match accounts {
Ok(accounts) => accounts,
Err(error) => {
tracing::warn!(
Expand Down
29 changes: 29 additions & 0 deletions apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,35 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value;
Ok(())
}

fn retry_budget_attempt_count(&self, issue_id: &str) -> Result<i64> {
self.connection
.query_row(
"SELECT COUNT(*) FROM run_attempts \
WHERE issue_id = ?1 AND status IN ('failed', 'interrupted', 'terminal_guarded')",
params![issue_id],
|row| row.get(0),
)
.map_err(Into::into)
}

fn issue_has_retry_budget_attempt_after(
&self,
issue_id: &str,
attempt_number: i64,
) -> Result<bool> {
let count = self.connection.query_row(
"SELECT COUNT(*) FROM run_attempts \
WHERE issue_id = ?1 \
AND attempt_number > ?2 \
AND status IN ('failed', 'interrupted', 'terminal_guarded') \
LIMIT 1",
params![issue_id, attempt_number],
|row| row.get::<_, i64>(0),
)?;

Ok(count > 0)
}

fn load_protocol_event_summaries(&self, state: &mut StateData) -> Result<()> {
self.load_compacted_protocol_event_summaries(state)?;

Expand Down
22 changes: 19 additions & 3 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ impl StateStore {

/// Report whether one issue is actively claimed by this or another process.
pub fn issue_has_active_shared_claim(&self, project_id: &str, issue_id: &str) -> Result<bool> {
let state = self.lock()?;
let state = self.lock_without_refresh()?;

if state.leases.contains_key(issue_id) {
return Ok(true);
Expand Down Expand Up @@ -677,7 +677,15 @@ impl StateStore {

/// Count attempts that consume the retry budget for one issue.
pub fn retry_budget_attempt_count(&self, issue_id: &str) -> Result<i64> {
let state = self.lock()?;
if let Some(sqlite) = self.sqlite.as_ref() {
let sqlite = sqlite
.lock()
.map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?;

return sqlite.retry_budget_attempt_count(issue_id);
}

let state = self.lock_without_refresh()?;
let retry_budget_attempts = state
.run_attempts
.values()
Expand All @@ -699,7 +707,15 @@ impl StateStore {
issue_id: &str,
attempt_number: i64,
) -> Result<bool> {
let state = self.lock()?;
if let Some(sqlite) = self.sqlite.as_ref() {
let sqlite = sqlite
.lock()
.map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?;

return sqlite.issue_has_retry_budget_attempt_after(issue_id, attempt_number);
}

let state = self.lock_without_refresh()?;

Ok(state.run_attempts.values().any(|attempt| {
attempt.issue_id == issue_id
Expand Down
86 changes: 86 additions & 0 deletions apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,92 @@ fn persistent_project_listing_does_not_refresh_full_event_journal() {
);
}

#[test]
fn persistent_retry_budget_queries_do_not_refresh_full_event_journal() {
let temp_dir = TempDir::new().expect("tempdir should create");
let state_path = temp_dir.path().join("runtime.sqlite3");
let observer = StateStore::open(&state_path).expect("observer state store should open");
let writer = StateStore::open(&state_path).expect("writer state store should open");

writer
.record_run_attempt("run-a", "PUB-101", 1, "interrupted")
.expect("writer retry attempt should record");
writer.record_run_attempt("run-b", "PUB-102", 1, "running").expect("writer run should record");
writer
.append_event("run-b", 1, "item/agentMessage/delta", "{}")
.expect("writer event should append");

assert_eq!(observer.retry_budget_attempt_count("PUB-101").expect("retry count should read"), 1);
assert!(
observer
.issue_has_retry_budget_attempt_after("PUB-101", 0)
.expect("retry after query should read")
);
assert!(
!observer
.issue_has_retry_budget_attempt_after("PUB-101", 1)
.expect("retry after query should read")
);

let state = observer.inner.lock().expect("test should inspect the local cache");

assert!(
!state.events.contains_key("run-b"),
"retry-budget queries should not refresh the full persistent event journal into the local cache"
);
assert!(
!state.event_summaries.contains_key("run-b"),
"retry-budget queries should not refresh protocol summaries unrelated to the issue"
);
assert!(
!state.run_attempts.contains_key("run-a"),
"retry-budget queries should use issue-scoped persistent reads instead of a full runtime refresh"
);
}

#[test]
fn persistent_shared_claim_check_does_not_refresh_full_event_journal() {
let temp_dir = TempDir::new().expect("tempdir should create");
let state_path = temp_dir.path().join("runtime.sqlite3");
let observer = StateStore::open(&state_path).expect("observer state store should open");
let holder = StateStore::open(&state_path).expect("holder state store should open");
let writer = StateStore::open(&state_path).expect("writer state store should open");
let slot_root = temp_dir.path().join("slots");

observer
.configure_dispatch_slot_root("pubfi", &slot_root, 2)
.expect("observer slot root should configure");
holder
.configure_dispatch_slot_root("pubfi", &slot_root, 2)
.expect("holder slot root should configure");
writer.record_run_attempt("run-b", "PUB-102", 1, "running").expect("writer run should record");
writer
.append_event("run-b", 1, "item/agentMessage/delta", "{}")
.expect("writer event should append");

assert!(
holder
.try_acquire_lease("pubfi", "PUB-101", "run-a", IN_PROGRESS_STATE)
.expect("holder should acquire the shared issue claim")
);
assert!(
observer
.issue_has_active_shared_claim("pubfi", "PUB-101")
.expect("shared claim check should read")
);

let state = observer.inner.lock().expect("test should inspect the local cache");

assert!(
!state.events.contains_key("run-b"),
"shared claim checks should not refresh the full persistent event journal into the local cache"
);
assert!(
!state.event_summaries.contains_key("run-b"),
"shared claim checks should not refresh protocol summaries unrelated to the issue"
);
}

#[test]
fn persistent_linear_execution_event_listing_does_not_refresh_full_ledger() {
let temp_dir = TempDir::new().expect("tempdir should create");
Expand Down