Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/decodex/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
time::Duration,
};

use rusqlite::{Connection, Transaction, params};
use rusqlite::{Connection, OptionalExtension, Transaction, params};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
Expand Down
102 changes: 100 additions & 2 deletions apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ impl StateData {
self.worktrees = loaded.worktrees;
}

fn replace_project_registry_state(&mut self, loaded: Self) {
self.projects = loaded.projects;
}

fn project_run_status(
&self,
project_id: &str,
Expand Down Expand Up @@ -394,13 +398,21 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value;
Ok(state)
}

fn load_project_run_state(&self) -> Result<StateData> {
fn load_project_run_state_for_project(&self, project_id: &str) -> Result<StateData> {
let mut state = StateData::default();

self.load_leases(&mut state)?;
self.load_run_attempts(&mut state)?;
self.load_protocol_event_summaries(&mut state)?;
self.load_worktrees(&mut state)?;
self.load_protocol_event_summaries_for_project_runs(&mut state, project_id)?;

Ok(state)
}

fn load_project_registry_state(&self) -> Result<StateData> {
let mut state = StateData::default();

self.load_projects(&mut state)?;

Ok(state)
}
Expand Down Expand Up @@ -827,6 +839,64 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value;
Ok(())
}

fn load_protocol_event_summaries_for_project_runs(
&self,
state: &mut StateData,
project_id: &str,
) -> Result<()> {
let mut run_ids = state
.run_attempts
.values()
.filter(|attempt| state.project_run_status(project_id, attempt).is_some())
.map(|attempt| attempt.run_id.clone())
.collect::<Vec<_>>();

run_ids.sort();
run_ids.dedup();

for run_id in run_ids {
self.load_compacted_protocol_event_summary_for_run(state, &run_id)?;
self.load_protocol_event_summary_for_run(state, &run_id)?;
}

Ok(())
}

fn load_protocol_event_summary_for_run(
&self,
state: &mut StateData,
run_id: &str,
) -> Result<()> {
let mut statement = self.connection.prepare(
"SELECT totals.event_count, totals.last_sequence_number, last.event_type, \
last.created_at, last.created_at_unix \
FROM (
SELECT COUNT(*) AS event_count, MAX(sequence_number) AS last_sequence_number \
FROM protocol_events WHERE run_id = ?1
) totals \
JOIN protocol_events last \
ON last.run_id = ?1 \
AND last.sequence_number = totals.last_sequence_number",
)?;
let summary = statement
.query_row(params![run_id], |row| {
Ok(ProtocolEventSummaryRecord {
event_count: row.get(0)?,
last_sequence_number: Some(row.get(1)?),
last_event_type: Some(row.get(2)?),
last_event_at: Some(row.get(3)?),
last_event_at_unix: Some(row.get(4)?),
})
})
.optional()?;

if let Some(summary) = summary {
state.event_summaries.insert(run_id.to_owned(), summary);
}

Ok(())
}

fn load_compacted_protocol_event_summaries(&self, state: &mut StateData) -> Result<()> {
let mut statement = self.connection.prepare(
"SELECT run_id, event_count, last_sequence_number, last_event_type, last_event_at, \
Expand Down Expand Up @@ -854,6 +924,34 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value;
Ok(())
}

fn load_compacted_protocol_event_summary_for_run(
&self,
state: &mut StateData,
run_id: &str,
) -> Result<()> {
let mut statement = self.connection.prepare(
"SELECT event_count, last_sequence_number, last_event_type, last_event_at, \
last_event_at_unix FROM protocol_event_summaries WHERE run_id = ?1",
)?;
let summary = statement
.query_row(params![run_id], |row| {
Ok(ProtocolEventSummaryRecord {
event_count: row.get(0)?,
last_sequence_number: row.get(1)?,
last_event_type: row.get(2)?,
last_event_at: row.get(3)?,
last_event_at_unix: row.get(4)?,
})
})
.optional()?;

if let Some(summary) = summary {
state.event_summaries.insert(run_id.to_owned(), summary);
}

Ok(())
}

fn load_worktrees(&self, state: &mut StateData) -> Result<()> {
let mut statement = self
.connection
Expand Down
45 changes: 36 additions & 9 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ impl StateStore {

/// List all registered projects known to this local Decodex installation.
pub(crate) fn list_projects(&self) -> Result<Vec<ProjectRegistration>> {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_registry_state_locked(&mut state)?;

let mut projects = state.projects.values().cloned().collect::<Vec<_>>();

projects.sort_by(|left, right| left.service_id().cmp(right.service_id()));
Expand Down Expand Up @@ -350,7 +353,10 @@ impl StateStore {

/// List all active leases.
pub fn list_leases(&self, project_id: &str) -> Result<Vec<IssueLease>> {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state, project_id)?;

let mut leases = state
.leases
.values()
Expand All @@ -366,7 +372,10 @@ impl StateStore {
/// List all active shared leases by combining local claims with other processes' issue claims.
pub fn list_active_shared_leases(&self, project_id: &str) -> Result<Vec<IssueLease>> {
let (mut leases_by_issue, dispatch_slot_config) = {
let state = self.lock()?;
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state, project_id)?;

let leases = state
.leases
.values()
Expand Down Expand Up @@ -856,7 +865,7 @@ impl StateStore {
) -> Result<Vec<ProjectRunStatus>> {
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;
self.refresh_project_run_state_locked(&mut state, project_id)?;

let mut runs = state
.run_attempts
Expand All @@ -878,7 +887,7 @@ impl StateStore {
) -> Result<(Vec<ProjectRunStatus>, Vec<ProjectRunStatus>)> {
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;
self.refresh_project_run_state_locked(&mut state, project_id)?;

let mut runs = state
.run_attempts
Expand All @@ -905,7 +914,7 @@ impl StateStore {
pub fn list_active_runs(&self, project_id: &str) -> Result<Vec<ProjectRunStatus>> {
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;
self.refresh_project_run_state_locked(&mut state, project_id)?;

let mut runs = state
.run_attempts
Expand Down Expand Up @@ -1321,7 +1330,7 @@ impl StateStore {
pub fn list_worktrees(&self, project_id: &str) -> Result<Vec<WorktreeMapping>> {
let mut state = self.lock_without_refresh()?;

self.refresh_project_run_state_locked(&mut state)?;
self.refresh_project_run_state_locked(&mut state, project_id)?;

let mut mappings = state
.worktrees
Expand Down Expand Up @@ -1375,20 +1384,38 @@ impl StateStore {
Ok(())
}

fn refresh_project_run_state_locked(&self, state: &mut StateData) -> Result<()> {
fn refresh_project_run_state_locked(
&self,
state: &mut StateData,
project_id: &str,
) -> Result<()> {
let Some(sqlite) = self.sqlite.as_ref() else {
return Ok(());
};
let sqlite = sqlite
.lock()
.map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?;
let loaded = sqlite.load_project_run_state()?;
let loaded = sqlite.load_project_run_state_for_project(project_id)?;

state.replace_project_run_state(loaded);

Ok(())
}

fn refresh_project_registry_state_locked(&self, state: &mut StateData) -> Result<()> {
let Some(sqlite) = self.sqlite.as_ref() else {
return Ok(());
};
let sqlite = sqlite
.lock()
.map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?;
let loaded = sqlite.load_project_registry_state()?;

state.replace_project_registry_state(loaded);

Ok(())
}

fn persist_runtime_state_locked(&self, state: &StateData) -> Result<()> {
let Some(sqlite) = self.sqlite.as_ref() else {
return Ok(());
Expand Down
78 changes: 70 additions & 8 deletions apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() {
writer
.append_event("run-b", 1, "item/agentMessage/delta", "{}")
.expect("writer event should append");
writer
.record_run_attempt("run-c", "PUB-103", 1, "succeeded")
.expect("writer project run should record");
writer
.upsert_worktree("pubfi", "PUB-103", "x/pubfi-pub-103", "/tmp/worktrees/pub-103")
.expect("writer project worktree should persist");
writer
.append_event("run-c", 1, "thread/archive", "{}")
.expect("writer project event should append");

let mut writer_record = LinearExecutionEventRecord::new(
LinearExecutionEventIdentity {
Expand All @@ -453,13 +462,23 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() {
.expect("writer ledger event should persist");

let runs = observer.list_active_runs("pubfi").expect("active runs should load");
let recent_runs = observer.list_recent_runs("pubfi", 10).expect("recent runs should load");
let leases = observer.list_active_shared_leases("pubfi").expect("shared leases should load");
let worktrees = observer.list_worktrees("pubfi").expect("worktrees should load");

assert_eq!(runs.len(), 1);
assert_eq!(runs[0].run_id(), "run-a");
assert_eq!(runs[0].event_count(), 1);
assert_eq!(runs[0].last_event_type(), Some("item/started"));
assert_eq!(worktrees.len(), 1);
assert!(
recent_runs.iter().any(|run| run.run_id() == "run-c"
&& run.event_count() == 1
&& run.last_event_type() == Some("thread/archive")),
"project-scoped persistent event summaries should still load for matching runs"
);
assert_eq!(leases.len(), 1);
assert_eq!(leases[0].issue_id(), "PUB-101");
assert_eq!(worktrees.len(), 2);
assert_eq!(worktrees[0].issue_id(), "PUB-101");

let state = observer.inner.lock().expect("test should inspect the local cache");
Expand All @@ -468,20 +487,63 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() {
!state.events.contains_key("run-b"),
"operator run listing should refresh event summaries without materializing unrelated event rows"
);
assert_eq!(
state
.event_summaries
.get("run-b")
.expect("unrelated persistent run should have a summary")
.event_count,
1
assert!(
!state.events.contains_key("run-c"),
"operator run listing should refresh project summaries without materializing project event rows"
);
assert!(
!state.event_summaries.contains_key("run-b"),
"operator run listing should not refresh summaries for runs outside the requested project"
);
assert!(
!state.linear_execution_events.contains_key(&writer_record.idempotency_key),
"operator run and worktree listing should not refresh the full persistent ledger into the local cache"
);
}

#[test]
fn persistent_project_listing_does_not_refresh_full_event_journal() {
let temp_dir = TempDir::new().expect("tempdir should create");
let state_path = temp_dir.path().join("runtime.sqlite3");
let observer = StateStore::open(&state_path).expect("observer state store should open");
let writer = StateStore::open(&state_path).expect("writer state store should open");
let registration = ProjectRegistration {
service_id: String::from("pubfi"),
config_path: temp_dir.path().join("project.toml"),
repo_root: temp_dir.path().join("repo"),
worktree_root: temp_dir.path().join("repo/.worktrees"),
workflow_path: temp_dir.path().join("repo/WORKFLOW.md"),
tracker_api_key_env_var: String::from("LINEAR_API_KEY_HACKINK"),
github_token_env_var: String::from("GITHUB_PAT_Y"),
enabled: true,
config_fingerprint: String::from("abc123"),
updated_at: String::from("2026-05-25T00:00:00Z"),
updated_at_unix: 1_779_667_200,
};

observer.upsert_project(&registration).expect("project should persist");
writer.record_run_attempt("run-b", "PUB-102", 1, "running").expect("writer run should record");
writer
.append_event("run-b", 1, "item/agentMessage/delta", "{}")
.expect("writer event should append");

let projects = observer.list_projects().expect("projects should load");

assert_eq!(projects.len(), 1);
assert_eq!(projects[0].service_id(), "pubfi");

let state = observer.inner.lock().expect("test should inspect the local cache");

assert!(
!state.events.contains_key("run-b"),
"project listing should not refresh the full persistent event journal into the local cache"
);
assert!(
!state.event_summaries.contains_key("run-b"),
"project listing should not refresh protocol summaries unrelated to the registry"
);
}

#[test]
fn persistent_linear_execution_event_listing_does_not_refresh_full_ledger() {
let temp_dir = TempDir::new().expect("tempdir should create");
Expand Down