Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ impl StateData {
self.review_orchestrations = loaded.review_orchestrations;
}

fn replace_project_run_state(&mut self, loaded: Self) {
self.leases = loaded.leases;
self.run_attempts = loaded.run_attempts;
self.event_summaries = loaded.event_summaries;
self.worktrees = loaded.worktrees;
}

fn project_run_status(
&self,
project_id: &str,
Expand Down Expand Up @@ -387,6 +394,17 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value;
Ok(state)
}

fn load_project_run_state(&self) -> Result<StateData> {
let mut state = StateData::default();

self.load_leases(&mut state)?;
self.load_run_attempts(&mut state)?;
self.load_protocol_event_summaries(&mut state)?;
self.load_worktrees(&mut state)?;

Ok(state)
}

fn persist_runtime_state(&mut self, state: &StateData) -> Result<()> {
let transaction = self.connection.transaction()?;

Expand Down
34 changes: 30 additions & 4 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,10 @@ impl StateStore {
project_id: &str,
limit: usize,
) -> Result<Vec<ProjectRunStatus>> {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;

let mut runs = state
.run_attempts
.values()
Expand All @@ -873,7 +876,10 @@ impl StateStore {
project_id: &str,
base_recent_limit: usize,
) -> Result<(Vec<ProjectRunStatus>, Vec<ProjectRunStatus>)> {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;

let mut runs = state
.run_attempts
.values()
Expand All @@ -897,7 +903,10 @@ impl StateStore {

/// List all active leased runs for one project without applying the recent-run limit.
pub fn list_active_runs(&self, project_id: &str) -> Result<Vec<ProjectRunStatus>> {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;

let mut runs = state
.run_attempts
.values()
Expand Down Expand Up @@ -1310,7 +1319,10 @@ impl StateStore {

/// List all known worktree mappings.
pub fn list_worktrees(&self, project_id: &str) -> Result<Vec<WorktreeMapping>> {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;

let mut mappings = state
.worktrees
.values()
Expand Down Expand Up @@ -1363,6 +1375,20 @@ impl StateStore {
Ok(())
}

fn refresh_project_run_state_locked(&self, state: &mut StateData) -> Result<()> {
let Some(sqlite) = self.sqlite.as_ref() else {
return Ok(());
};
let sqlite = sqlite
.lock()
.map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?;
let loaded = sqlite.load_project_run_state()?;

state.replace_project_run_state(loaded);

Ok(())
}

fn persist_runtime_state_locked(&self, state: &StateData) -> Result<()> {
let Some(sqlite) = self.sqlite.as_ref() else {
return Ok(());
Expand Down
32 changes: 30 additions & 2 deletions apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,18 +431,42 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() {
.append_event("run-b", 1, "item/agentMessage/delta", "{}")
.expect("writer event should append");

let mut writer_record = LinearExecutionEventRecord::new(
LinearExecutionEventIdentity {
service_id: "pubfi",
issue_id: "PUB-102",
issue_identifier: "PUB-102",
run_id: "run-b",
attempt_number: 1,
},
"closeout",
String::from("2026-04-29T10:12:00Z"),
"closeout",
);

writer_record.summary = Some(String::from("Writer closeout."));
writer_record.pr_url = Some(String::from("https://github.com/hack-ink/decodex/pull/102"));
writer_record.commit_sha = Some(String::from("2222222222222222222222222222222222222222"));

writer
.record_linear_execution_event(&writer_record)
.expect("writer ledger event should persist");

let runs = observer.list_active_runs("pubfi").expect("active runs should load");
let worktrees = observer.list_worktrees("pubfi").expect("worktrees should load");

assert_eq!(runs.len(), 1);
assert_eq!(runs[0].run_id(), "run-a");
assert_eq!(runs[0].event_count(), 1);
assert_eq!(runs[0].last_event_type(), Some("item/started"));
assert_eq!(worktrees.len(), 1);
assert_eq!(worktrees[0].issue_id(), "PUB-101");

let state = observer.inner.lock().expect("test should inspect the local cache");

assert!(
state.events.is_empty(),
"operator run listing should refresh event summaries without materializing event rows"
!state.events.contains_key("run-b"),
"operator run listing should refresh event summaries without materializing unrelated event rows"
);
assert_eq!(
state
Expand All @@ -452,6 +476,10 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() {
.event_count,
1
);
assert!(
!state.linear_execution_events.contains_key(&writer_record.idempotency_key),
"operator run and worktree listing should not refresh the full persistent ledger into the local cache"
);
}

#[test]
Expand Down