diff --git a/apps/decodex/src/agent/codex_accounts.rs b/apps/decodex/src/agent/codex_accounts.rs index 80593775..1c025052 100644 --- a/apps/decodex/src/agent/codex_accounts.rs +++ b/apps/decodex/src/agent/codex_accounts.rs @@ -134,6 +134,30 @@ impl CodexAccountPool { Ok(summaries) } + pub(crate) fn account_activity_summaries_snapshot( + &self, + ) -> crate::prelude::Result> { + let now = OffsetDateTime::now_utc().unix_timestamp(); + let cache_key = self.cache_key(); + let cache = ACCOUNT_ACTIVITY_CACHE.get_or_init(|| Mutex::new(None)); + let cached = cache + .lock() + .map_err(|error| eyre::eyre!("Codex account usage cache is poisoned: {error}"))?; + + if let Some(entry) = cached.as_ref() + && entry.key == cache_key + { + return Ok(entry.summaries.clone()); + } + + drop(cached); + + let _guard = self.lock_records()?; + let records = self.load_records()?; + + Ok(records.iter().filter_map(|record| record.configured_activity_summary(now)).collect()) + } + fn cache_key(&self) -> AccountActivityCacheKey { AccountActivityCacheKey { path: self.path.clone(), @@ -1457,6 +1481,34 @@ mod tests { assert_eq!(account.account_summaries().len(), 1); } + #[test] + fn account_activity_snapshot_uses_configured_records_without_usage_probe() { + let temp_dir = TempDir::new().expect("temp dir should exist"); + let accounts_path = temp_dir.path().join("accounts.jsonl"); + + fs::write( + &accounts_path, + r#"{"email":"snapshot@example.com","auth_mode":"chatgpt","tokens":{"access_token":"access-snapshot","refresh_token":"refresh-snapshot","account_id":"acct_snapshot"}}"#, + ) + .expect("accounts fixture should write"); + + let pool = CodexAccountPool::new_with_fixed_account( + &accounts_path, + "http://127.0.0.1:9/usage", + DEFAULT_REFRESH_ENDPOINT, + None, + ) + .expect("account pool should initialize"); + let summaries = pool.account_activity_summaries_snapshot().expect("snapshot should load"); + + assert_eq!(summaries.len(), 1); + assert_eq!(summaries[0].email.as_deref(), Some("snapshot@example.com")); + assert_eq!(summaries[0].status, "available"); + assert_eq!(summaries[0].refresh_status, "not_checked"); + assert_eq!(summaries[0].primary_remaining_percent, None); + assert_eq!(summaries[0].note.as_deref(), Some("configured account")); + } + #[test] fn usage_summary_parses_codex_rate_limit_payload() { let payload = serde_json::json!({ diff --git a/apps/decodex/src/orchestrator/daemon.rs b/apps/decodex/src/orchestrator/daemon.rs index a9401aa8..e50a30b2 100644 --- a/apps/decodex/src/orchestrator/daemon.rs +++ b/apps/decodex/src/orchestrator/daemon.rs @@ -191,7 +191,12 @@ where let mut snapshot = if warnings.is_empty() { build_control_plane_operator_status_snapshot(tracker, project, workflow, state_store, limit)? } else { - build_operator_status_snapshot(project, state_store, limit)? + build_operator_status_snapshot_with_account_mode( + project, + state_store, + limit, + AccountActivityMode::Snapshot, + )? }; if !warnings.is_empty() { diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 515eed0f..08d51d6f 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -481,7 +481,11 @@ fn build_operator_run_activity_event(state_store: &StateStore) -> Result crate::prelude::Result<()> { @@ -143,6 +149,20 @@ fn build_operator_status_snapshot( project: &ServiceConfig, state_store: &StateStore, limit: usize, +) -> crate::prelude::Result { + build_operator_status_snapshot_with_account_mode( + project, + state_store, + limit, + AccountActivityMode::Probe, + ) +} + +fn build_operator_status_snapshot_with_account_mode( + project: &ServiceConfig, + state_store: &StateStore, + limit: usize, + account_activity_mode: AccountActivityMode, ) -> crate::prelude::Result { let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp(); let (active_runs, recent_runs) = state_store.list_project_runs(project.service_id(), limit)?; @@ -170,7 +190,7 @@ fn build_operator_status_snapshot( let history_lanes = operator_history_lanes(&active_runs, &recent_runs); let (worktrees, mut warnings) = operator_status_worktrees(project, state_store)?; - let accounts = codex_account_activity_summaries(project, &mut warnings); + let accounts = codex_account_activity_summaries(project, &mut warnings, account_activity_mode); let mut snapshot = OperatorStatusSnapshot { project_id: project.service_id().to_owned(), run_limit: limit, @@ -238,6 +258,7 @@ where state_store, limit, true, + AccountActivityMode::Probe, ) } @@ -258,6 +279,7 @@ where state_store, limit, false, + AccountActivityMode::Snapshot, ) } @@ -268,6 +290,7 @@ fn build_live_operator_status_snapshot_with_history_ledger( state_store: &StateStore, limit: usize, hydrate_history_ledger: bool, + account_activity_mode: AccountActivityMode, ) -> crate::prelude::Result where T: IssueTracker, @@ -281,7 +304,12 @@ where let review_state_inspector = GhPullRequestReviewStateInspector { github_token_env_var: Some(project.github().token_env_var().to_owned()), }; - let mut snapshot = build_operator_status_snapshot(project, state_store, limit)?; + let mut snapshot = build_operator_status_snapshot_with_account_mode( + project, + state_store, + limit, + account_activity_mode, + )?; hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?; @@ -898,14 +926,17 @@ fn format_merged_worktree_cleanup_debts( fn codex_account_activity_summaries( project: &ServiceConfig, warnings: &mut Vec, + mode: AccountActivityMode, ) -> Vec { let Some(accounts_config) = project.codex().accounts() else { return Vec::new(); }; + let accounts = CodexAccountPool::from_config(accounts_config).and_then(|pool| match mode { + AccountActivityMode::Probe => pool.account_activity_summaries_cached(false), + AccountActivityMode::Snapshot => pool.account_activity_summaries_snapshot(), + }); - match CodexAccountPool::from_config(accounts_config) - .and_then(|pool| pool.account_activity_summaries_cached(false)) - { + match accounts { Ok(accounts) => accounts, Err(error) => { tracing::warn!( diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index fadb56f1..5037a41c 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -802,6 +802,35 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn retry_budget_attempt_count(&self, issue_id: &str) -> Result { + self.connection + .query_row( + "SELECT COUNT(*) FROM run_attempts \ + WHERE issue_id = ?1 AND status IN ('failed', 'interrupted', 'terminal_guarded')", + params![issue_id], + |row| row.get(0), + ) + .map_err(Into::into) + } + + fn issue_has_retry_budget_attempt_after( + &self, + issue_id: &str, + attempt_number: i64, + ) -> Result { + let count = self.connection.query_row( + "SELECT COUNT(*) FROM run_attempts \ + WHERE issue_id = ?1 \ + AND attempt_number > ?2 \ + AND status IN ('failed', 'interrupted', 'terminal_guarded') \ + LIMIT 1", + params![issue_id, attempt_number], + |row| row.get::<_, i64>(0), + )?; + + Ok(count > 0) + } + fn load_protocol_event_summaries(&self, state: &mut StateData) -> Result<()> { self.load_compacted_protocol_event_summaries(state)?; diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index ad11550d..80fe3de8 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -450,7 +450,7 @@ impl StateStore { /// Report whether one issue is actively claimed by this or another process. pub fn issue_has_active_shared_claim(&self, project_id: &str, issue_id: &str) -> Result { - let state = self.lock()?; + let state = self.lock_without_refresh()?; if state.leases.contains_key(issue_id) { return Ok(true); @@ -677,7 +677,15 @@ impl StateStore { /// Count attempts that consume the retry budget for one issue. pub fn retry_budget_attempt_count(&self, issue_id: &str) -> Result { - let state = self.lock()?; + if let Some(sqlite) = self.sqlite.as_ref() { + let sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + return sqlite.retry_budget_attempt_count(issue_id); + } + + let state = self.lock_without_refresh()?; let retry_budget_attempts = state .run_attempts .values() @@ -699,7 +707,15 @@ impl StateStore { issue_id: &str, attempt_number: i64, ) -> Result { - let state = self.lock()?; + if let Some(sqlite) = self.sqlite.as_ref() { + let sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + return sqlite.issue_has_retry_budget_attempt_after(issue_id, attempt_number); + } + + let state = self.lock_without_refresh()?; Ok(state.run_attempts.values().any(|attempt| { attempt.issue_id == issue_id diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 6f129e56..b3774c4f 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -544,6 +544,92 @@ fn persistent_project_listing_does_not_refresh_full_event_journal() { ); } +#[test] +fn persistent_retry_budget_queries_do_not_refresh_full_event_journal() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let observer = StateStore::open(&state_path).expect("observer state store should open"); + let writer = StateStore::open(&state_path).expect("writer state store should open"); + + writer + .record_run_attempt("run-a", "PUB-101", 1, "interrupted") + .expect("writer retry attempt should record"); + writer.record_run_attempt("run-b", "PUB-102", 1, "running").expect("writer run should record"); + writer + .append_event("run-b", 1, "item/agentMessage/delta", "{}") + .expect("writer event should append"); + + assert_eq!(observer.retry_budget_attempt_count("PUB-101").expect("retry count should read"), 1); + assert!( + observer + .issue_has_retry_budget_attempt_after("PUB-101", 0) + .expect("retry after query should read") + ); + assert!( + !observer + .issue_has_retry_budget_attempt_after("PUB-101", 1) + .expect("retry after query should read") + ); + + let state = observer.inner.lock().expect("test should inspect the local cache"); + + assert!( + !state.events.contains_key("run-b"), + "retry-budget queries should not refresh the full persistent event journal into the local cache" + ); + assert!( + !state.event_summaries.contains_key("run-b"), + "retry-budget queries should not refresh protocol summaries unrelated to the issue" + ); + assert!( + !state.run_attempts.contains_key("run-a"), + "retry-budget queries should use issue-scoped persistent reads instead of a full runtime refresh" + ); +} + +#[test] +fn persistent_shared_claim_check_does_not_refresh_full_event_journal() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let observer = StateStore::open(&state_path).expect("observer state store should open"); + let holder = StateStore::open(&state_path).expect("holder state store should open"); + let writer = StateStore::open(&state_path).expect("writer state store should open"); + let slot_root = temp_dir.path().join("slots"); + + observer + .configure_dispatch_slot_root("pubfi", &slot_root, 2) + .expect("observer slot root should configure"); + holder + .configure_dispatch_slot_root("pubfi", &slot_root, 2) + .expect("holder slot root should configure"); + writer.record_run_attempt("run-b", "PUB-102", 1, "running").expect("writer run should record"); + writer + .append_event("run-b", 1, "item/agentMessage/delta", "{}") + .expect("writer event should append"); + + assert!( + holder + .try_acquire_lease("pubfi", "PUB-101", "run-a", IN_PROGRESS_STATE) + .expect("holder should acquire the shared issue claim") + ); + assert!( + observer + .issue_has_active_shared_claim("pubfi", "PUB-101") + .expect("shared claim check should read") + ); + + let state = observer.inner.lock().expect("test should inspect the local cache"); + + assert!( + !state.events.contains_key("run-b"), + "shared claim checks should not refresh the full persistent event journal into the local cache" + ); + assert!( + !state.event_summaries.contains_key("run-b"), + "shared claim checks should not refresh protocol summaries unrelated to the issue" + ); +} + #[test] fn persistent_linear_execution_event_listing_does_not_refresh_full_ledger() { let temp_dir = TempDir::new().expect("tempdir should create");