diff --git a/apps/decodex/src/state.rs b/apps/decodex/src/state.rs index 34d2ee90..8f6d1b47 100644 --- a/apps/decodex/src/state.rs +++ b/apps/decodex/src/state.rs @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use rusqlite::{Connection, Transaction, params}; +use rusqlite::{Connection, OptionalExtension, Transaction, params}; use serde::{Deserialize, Serialize}; use serde_json::Value; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index f8a4885e..fadb56f1 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -126,6 +126,10 @@ impl StateData { self.worktrees = loaded.worktrees; } + fn replace_project_registry_state(&mut self, loaded: Self) { + self.projects = loaded.projects; + } + fn project_run_status( &self, project_id: &str, @@ -394,13 +398,21 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(state) } - fn load_project_run_state(&self) -> Result { + fn load_project_run_state_for_project(&self, project_id: &str) -> Result { let mut state = StateData::default(); self.load_leases(&mut state)?; self.load_run_attempts(&mut state)?; - self.load_protocol_event_summaries(&mut state)?; self.load_worktrees(&mut state)?; + self.load_protocol_event_summaries_for_project_runs(&mut state, project_id)?; + + Ok(state) + } + + fn load_project_registry_state(&self) -> Result { + let mut state = StateData::default(); + + self.load_projects(&mut state)?; Ok(state) } @@ -827,6 +839,64 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn load_protocol_event_summaries_for_project_runs( + &self, + state: &mut StateData, + project_id: &str, + ) -> Result<()> { + let mut run_ids = state + .run_attempts + .values() + .filter(|attempt| state.project_run_status(project_id, attempt).is_some()) + .map(|attempt| attempt.run_id.clone()) + .collect::>(); + + run_ids.sort(); + run_ids.dedup(); + + for run_id in run_ids { + self.load_compacted_protocol_event_summary_for_run(state, &run_id)?; + self.load_protocol_event_summary_for_run(state, &run_id)?; + } + + Ok(()) + } + + fn load_protocol_event_summary_for_run( + &self, + state: &mut StateData, + run_id: &str, + ) -> Result<()> { + let mut statement = self.connection.prepare( + "SELECT totals.event_count, totals.last_sequence_number, last.event_type, \ + last.created_at, last.created_at_unix \ + FROM ( + SELECT COUNT(*) AS event_count, MAX(sequence_number) AS last_sequence_number \ + FROM protocol_events WHERE run_id = ?1 + ) totals \ + JOIN protocol_events last \ + ON last.run_id = ?1 \ + AND last.sequence_number = totals.last_sequence_number", + )?; + let summary = statement + .query_row(params![run_id], |row| { + Ok(ProtocolEventSummaryRecord { + event_count: row.get(0)?, + last_sequence_number: Some(row.get(1)?), + last_event_type: Some(row.get(2)?), + last_event_at: Some(row.get(3)?), + last_event_at_unix: Some(row.get(4)?), + }) + }) + .optional()?; + + if let Some(summary) = summary { + state.event_summaries.insert(run_id.to_owned(), summary); + } + + Ok(()) + } + fn load_compacted_protocol_event_summaries(&self, state: &mut StateData) -> Result<()> { let mut statement = self.connection.prepare( "SELECT run_id, event_count, last_sequence_number, last_event_type, last_event_at, \ @@ -854,6 +924,34 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn load_compacted_protocol_event_summary_for_run( + &self, + state: &mut StateData, + run_id: &str, + ) -> Result<()> { + let mut statement = self.connection.prepare( + "SELECT event_count, last_sequence_number, last_event_type, last_event_at, \ + last_event_at_unix FROM protocol_event_summaries WHERE run_id = ?1", + )?; + let summary = statement + .query_row(params![run_id], |row| { + Ok(ProtocolEventSummaryRecord { + event_count: row.get(0)?, + last_sequence_number: row.get(1)?, + last_event_type: row.get(2)?, + last_event_at: row.get(3)?, + last_event_at_unix: row.get(4)?, + }) + }) + .optional()?; + + if let Some(summary) = summary { + state.event_summaries.insert(run_id.to_owned(), summary); + } + + Ok(()) + } + fn load_worktrees(&self, state: &mut StateData) -> Result<()> { let mut statement = self .connection diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 4d9a3cbd..ad11550d 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -92,7 +92,10 @@ impl StateStore { /// List all registered projects known to this local Decodex installation. pub(crate) fn list_projects(&self) -> Result> { - let state = self.lock()?; + let mut state = self.lock_without_refresh()?; + + self.refresh_project_registry_state_locked(&mut state)?; + let mut projects = state.projects.values().cloned().collect::>(); projects.sort_by(|left, right| left.service_id().cmp(right.service_id())); @@ -350,7 +353,10 @@ impl StateStore { /// List all active leases. pub fn list_leases(&self, project_id: &str) -> Result> { - let state = self.lock()?; + let mut state = self.lock_without_refresh()?; + + self.refresh_project_run_state_locked(&mut state, project_id)?; + let mut leases = state .leases .values() @@ -366,7 +372,10 @@ impl StateStore { /// List all active shared leases by combining local claims with other processes' issue claims. pub fn list_active_shared_leases(&self, project_id: &str) -> Result> { let (mut leases_by_issue, dispatch_slot_config) = { - let state = self.lock()?; + let mut state = self.lock_without_refresh()?; + + self.refresh_project_run_state_locked(&mut state, project_id)?; + let leases = state .leases .values() @@ -856,7 +865,7 @@ impl StateStore { ) -> Result> { let mut state = self.lock_without_refresh()?; - self.refresh_project_run_state_locked(&mut state)?; + self.refresh_project_run_state_locked(&mut state, project_id)?; let mut runs = state .run_attempts @@ -878,7 +887,7 @@ impl StateStore { ) -> Result<(Vec, Vec)> { let mut state = self.lock_without_refresh()?; - self.refresh_project_run_state_locked(&mut state)?; + self.refresh_project_run_state_locked(&mut state, project_id)?; let mut runs = state .run_attempts @@ -905,7 +914,7 @@ impl StateStore { pub fn list_active_runs(&self, project_id: &str) -> Result> { let mut state = self.lock_without_refresh()?; - self.refresh_project_run_state_locked(&mut state)?; + self.refresh_project_run_state_locked(&mut state, project_id)?; let mut runs = state .run_attempts @@ -1321,7 +1330,7 @@ impl StateStore { pub fn list_worktrees(&self, project_id: &str) -> Result> { let mut state = self.lock_without_refresh()?; - self.refresh_project_run_state_locked(&mut state)?; + self.refresh_project_run_state_locked(&mut state, project_id)?; let mut mappings = state .worktrees @@ -1375,20 +1384,38 @@ impl StateStore { Ok(()) } - fn refresh_project_run_state_locked(&self, state: &mut StateData) -> Result<()> { + fn refresh_project_run_state_locked( + &self, + state: &mut StateData, + project_id: &str, + ) -> Result<()> { let Some(sqlite) = self.sqlite.as_ref() else { return Ok(()); }; let sqlite = sqlite .lock() .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; - let loaded = sqlite.load_project_run_state()?; + let loaded = sqlite.load_project_run_state_for_project(project_id)?; state.replace_project_run_state(loaded); Ok(()) } + fn refresh_project_registry_state_locked(&self, state: &mut StateData) -> Result<()> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(()); + }; + let sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + let loaded = sqlite.load_project_registry_state()?; + + state.replace_project_registry_state(loaded); + + Ok(()) + } + fn persist_runtime_state_locked(&self, state: &StateData) -> Result<()> { let Some(sqlite) = self.sqlite.as_ref() else { return Ok(()); diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index e01e0820..6f129e56 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -430,6 +430,15 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() { writer .append_event("run-b", 1, "item/agentMessage/delta", "{}") .expect("writer event should append"); + writer + .record_run_attempt("run-c", "PUB-103", 1, "succeeded") + .expect("writer project run should record"); + writer + .upsert_worktree("pubfi", "PUB-103", "x/pubfi-pub-103", "/tmp/worktrees/pub-103") + .expect("writer project worktree should persist"); + writer + .append_event("run-c", 1, "thread/archive", "{}") + .expect("writer project event should append"); let mut writer_record = LinearExecutionEventRecord::new( LinearExecutionEventIdentity { @@ -453,13 +462,23 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() { .expect("writer ledger event should persist"); let runs = observer.list_active_runs("pubfi").expect("active runs should load"); + let recent_runs = observer.list_recent_runs("pubfi", 10).expect("recent runs should load"); + let leases = observer.list_active_shared_leases("pubfi").expect("shared leases should load"); let worktrees = observer.list_worktrees("pubfi").expect("worktrees should load"); assert_eq!(runs.len(), 1); assert_eq!(runs[0].run_id(), "run-a"); assert_eq!(runs[0].event_count(), 1); assert_eq!(runs[0].last_event_type(), Some("item/started")); - assert_eq!(worktrees.len(), 1); + assert!( + recent_runs.iter().any(|run| run.run_id() == "run-c" + && run.event_count() == 1 + && run.last_event_type() == Some("thread/archive")), + "project-scoped persistent event summaries should still load for matching runs" + ); + assert_eq!(leases.len(), 1); + assert_eq!(leases[0].issue_id(), "PUB-101"); + assert_eq!(worktrees.len(), 2); assert_eq!(worktrees[0].issue_id(), "PUB-101"); let state = observer.inner.lock().expect("test should inspect the local cache"); @@ -468,13 +487,13 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() { !state.events.contains_key("run-b"), "operator run listing should refresh event summaries without materializing unrelated event rows" ); - assert_eq!( - state - .event_summaries - .get("run-b") - .expect("unrelated persistent run should have a summary") - .event_count, - 1 + assert!( + !state.events.contains_key("run-c"), + "operator run listing should refresh project summaries without materializing project event rows" + ); + assert!( + !state.event_summaries.contains_key("run-b"), + "operator run listing should not refresh summaries for runs outside the requested project" ); assert!( !state.linear_execution_events.contains_key(&writer_record.idempotency_key), @@ -482,6 +501,49 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() { ); } +#[test] +fn persistent_project_listing_does_not_refresh_full_event_journal() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let observer = StateStore::open(&state_path).expect("observer state store should open"); + let writer = StateStore::open(&state_path).expect("writer state store should open"); + let registration = ProjectRegistration { + service_id: String::from("pubfi"), + config_path: temp_dir.path().join("project.toml"), + repo_root: temp_dir.path().join("repo"), + worktree_root: temp_dir.path().join("repo/.worktrees"), + workflow_path: temp_dir.path().join("repo/WORKFLOW.md"), + tracker_api_key_env_var: String::from("LINEAR_API_KEY_HACKINK"), + github_token_env_var: String::from("GITHUB_PAT_Y"), + enabled: true, + config_fingerprint: String::from("abc123"), + updated_at: String::from("2026-05-25T00:00:00Z"), + updated_at_unix: 1_779_667_200, + }; + + observer.upsert_project(®istration).expect("project should persist"); + writer.record_run_attempt("run-b", "PUB-102", 1, "running").expect("writer run should record"); + writer + .append_event("run-b", 1, "item/agentMessage/delta", "{}") + .expect("writer event should append"); + + let projects = observer.list_projects().expect("projects should load"); + + assert_eq!(projects.len(), 1); + assert_eq!(projects[0].service_id(), "pubfi"); + + let state = observer.inner.lock().expect("test should inspect the local cache"); + + assert!( + !state.events.contains_key("run-b"), + "project listing should not refresh the full persistent event journal into the local cache" + ); + assert!( + !state.event_summaries.contains_key("run-b"), + "project listing should not refresh protocol summaries unrelated to the registry" + ); +} + #[test] fn persistent_linear_execution_event_listing_does_not_refresh_full_ledger() { let temp_dir = TempDir::new().expect("tempdir should create");