From 2ff146ee9cece3d95c3bc89a10d386afc9594055 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Wed, 27 May 2026 22:28:43 +0800 Subject: [PATCH] Avoid full ledger reloads in operator snapshots --- apps/decodex/src/state/internal.rs | 35 +++++++++++++++++++++++++ apps/decodex/src/state/store.rs | 39 +++++++++++++++++++++------- apps/decodex/src/state/tests.rs | 41 ++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 9 deletions(-) diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index d3f13191..ee1aa438 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -895,6 +895,41 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn list_linear_execution_events( + &self, + service_id: &str, + issue_id: &str, + ) -> Result> { + let mut statement = self.connection.prepare( + "SELECT payload_json, event_unix, recorded_at, recorded_at_unix \ + FROM linear_execution_events \ + WHERE service_id = ?1 AND issue_id = ?2", + )?; + let rows = statement.query_map(params![service_id, issue_id], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, String>(2)?, + row.get::<_, i64>(3)?, + )) + })?; + let mut records = Vec::new(); + + for row in rows { + let (payload_json, event_unix, recorded_at, recorded_at_unix) = row?; + let record = serde_json::from_str::(&payload_json)?; + + records.push(LinearExecutionEventRuntimeRecord { + record, + event_unix, + recorded_at, + recorded_at_unix, + }); + } + + Ok(records) + } + fn load_private_execution_events(&self, state: &mut StateData) -> Result<()> { let mut statement = self.connection.prepare( "SELECT record_id, project_id, issue_id, run_id, attempt_number, event_type, \ diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index ed1e3f6c..2bffc3f8 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -1002,15 +1002,21 @@ impl StateStore { service_id: &str, issue_id: &str, ) -> Result> { - let state = self.lock()?; - let mut records = state - .linear_execution_events - .values() - .filter(|record| { - record.record.service_id == service_id && record.record.issue_id == issue_id - }) - .cloned() - .collect::>(); + let mut records = match self.list_persisted_linear_execution_events(service_id, issue_id)? { + Some(records) => records, + None => { + let state = self.lock_without_refresh()?; + + state + .linear_execution_events + .values() + .filter(|record| { + record.record.service_id == service_id && record.record.issue_id == issue_id + }) + .cloned() + .collect::>() + }, + }; records.sort_by(compare_linear_execution_event_runtime_records); @@ -1455,6 +1461,21 @@ impl StateStore { sqlite.delete_linear_execution_event(idempotency_key) } + fn list_persisted_linear_execution_events( + &self, + service_id: &str, + issue_id: &str, + ) -> Result>> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(None); + }; + let sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + sqlite.list_linear_execution_events(service_id, issue_id).map(Some) + } + fn insert_private_execution_event_locked( &self, record: &PrivateExecutionEventRuntimeRecord, diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index a032500d..a0580338 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -454,6 +454,47 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() { ); } +#[test] +fn persistent_linear_execution_event_listing_does_not_refresh_full_ledger() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let observer = StateStore::open(&state_path).expect("observer state store should open"); + let writer = StateStore::open(&state_path).expect("writer state store should open"); + let mut writer_record = LinearExecutionEventRecord::new( + LinearExecutionEventIdentity { + service_id: "pubfi", + issue_id: "PUB-102", + issue_identifier: "PUB-102", + run_id: "run-b", + attempt_number: 1, + }, + "closeout", + String::from("2026-04-29T10:12:00Z"), + "closeout", + ); + + writer_record.summary = Some(String::from("Writer closeout.")); + writer_record.pr_url = Some(String::from("https://github.com/hack-ink/decodex/pull/102")); + writer_record.commit_sha = Some(String::from("2222222222222222222222222222222222222222")); + + writer + .record_linear_execution_event(&writer_record) + .expect("writer ledger event should persist"); + + let observed = observer + .list_linear_execution_events("pubfi", "PUB-102") + .expect("observer should read issue-scoped ledger events"); + + assert_eq!(observed, vec![writer_record.clone()]); + + let state = observer.inner.lock().expect("test should inspect the local cache"); + + assert!( + !state.linear_execution_events.contains_key(&writer_record.idempotency_key), + "issue-scoped ledger listing should not refresh the full persistent ledger into the local cache" + ); +} + #[test] fn manages_issue_leases() { let store = StateStore::open_in_memory().expect("in-memory state store should open");