Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,41 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value;
Ok(())
}

fn list_linear_execution_events(
&self,
service_id: &str,
issue_id: &str,
) -> Result<Vec<LinearExecutionEventRuntimeRecord>> {
let mut statement = self.connection.prepare(
"SELECT payload_json, event_unix, recorded_at, recorded_at_unix \
FROM linear_execution_events \
WHERE service_id = ?1 AND issue_id = ?2",
)?;
let rows = statement.query_map(params![service_id, issue_id], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<i64>>(1)?,
row.get::<_, String>(2)?,
row.get::<_, i64>(3)?,
))
})?;
let mut records = Vec::new();

for row in rows {
let (payload_json, event_unix, recorded_at, recorded_at_unix) = row?;
let record = serde_json::from_str::<LinearExecutionEventRecord>(&payload_json)?;

records.push(LinearExecutionEventRuntimeRecord {
record,
event_unix,
recorded_at,
recorded_at_unix,
});
}

Ok(records)
}

fn load_private_execution_events(&self, state: &mut StateData) -> Result<()> {
let mut statement = self.connection.prepare(
"SELECT record_id, project_id, issue_id, run_id, attempt_number, event_type, \
Expand Down
39 changes: 30 additions & 9 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,15 +1002,21 @@ impl StateStore {
service_id: &str,
issue_id: &str,
) -> Result<Vec<LinearExecutionEventRecord>> {
let state = self.lock()?;
let mut records = state
.linear_execution_events
.values()
.filter(|record| {
record.record.service_id == service_id && record.record.issue_id == issue_id
})
.cloned()
.collect::<Vec<_>>();
let mut records = match self.list_persisted_linear_execution_events(service_id, issue_id)? {
Some(records) => records,
None => {
let state = self.lock_without_refresh()?;

state
.linear_execution_events
.values()
.filter(|record| {
record.record.service_id == service_id && record.record.issue_id == issue_id
})
.cloned()
.collect::<Vec<_>>()
},
};

records.sort_by(compare_linear_execution_event_runtime_records);

Expand Down Expand Up @@ -1455,6 +1461,21 @@ impl StateStore {
sqlite.delete_linear_execution_event(idempotency_key)
}

fn list_persisted_linear_execution_events(
&self,
service_id: &str,
issue_id: &str,
) -> Result<Option<Vec<LinearExecutionEventRuntimeRecord>>> {
let Some(sqlite) = self.sqlite.as_ref() else {
return Ok(None);
};
let sqlite = sqlite
.lock()
.map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?;

sqlite.list_linear_execution_events(service_id, issue_id).map(Some)
}

fn insert_private_execution_event_locked(
&self,
record: &PrivateExecutionEventRuntimeRecord,
Expand Down
41 changes: 41 additions & 0 deletions apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,47 @@ fn persistent_project_run_listing_does_not_refresh_full_event_journal() {
);
}

#[test]
fn persistent_linear_execution_event_listing_does_not_refresh_full_ledger() {
let temp_dir = TempDir::new().expect("tempdir should create");
let state_path = temp_dir.path().join("runtime.sqlite3");
let observer = StateStore::open(&state_path).expect("observer state store should open");
let writer = StateStore::open(&state_path).expect("writer state store should open");
let mut writer_record = LinearExecutionEventRecord::new(
LinearExecutionEventIdentity {
service_id: "pubfi",
issue_id: "PUB-102",
issue_identifier: "PUB-102",
run_id: "run-b",
attempt_number: 1,
},
"closeout",
String::from("2026-04-29T10:12:00Z"),
"closeout",
);

writer_record.summary = Some(String::from("Writer closeout."));
writer_record.pr_url = Some(String::from("https://github.com/hack-ink/decodex/pull/102"));
writer_record.commit_sha = Some(String::from("2222222222222222222222222222222222222222"));

writer
.record_linear_execution_event(&writer_record)
.expect("writer ledger event should persist");

let observed = observer
.list_linear_execution_events("pubfi", "PUB-102")
.expect("observer should read issue-scoped ledger events");

assert_eq!(observed, vec![writer_record.clone()]);

let state = observer.inner.lock().expect("test should inspect the local cache");

assert!(
!state.linear_execution_events.contains_key(&writer_record.idempotency_key),
"issue-scoped ledger listing should not refresh the full persistent ledger into the local cache"
);
}

#[test]
fn manages_issue_leases() {
let store = StateStore::open_in_memory().expect("in-memory state store should open");
Expand Down