Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,9 @@ fn read_websocket_json_until(

#[test]
fn operator_dashboard_run_activity_event_summarizes_active_runs() {
let temp_dir = TempDir::new().expect("temp dir should exist");
let _home_guard =
TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("temp path should be UTF-8"));
let (_temp_dir, config, _workflow) = temp_project_layout();
let state_store = StateStore::open_in_memory().expect("state store should open");
let registration = ProjectRegistration::from_config(
Expand Down
59 changes: 58 additions & 1 deletion apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2009,12 +2009,69 @@ fn write_run_activity_marker_record(
marker: &RunActivityMarkerRecord,
) -> Result<()> {
let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE);
let mut marker = marker.clone();

write_run_activity_marker_body_atomic(&marker_path, &serialize_run_activity_marker_record(marker))?;
if let Some(current_marker) = read_run_activity_marker_record(worktree_path)? {
preserve_current_run_account_marker_fields(&current_marker, &mut marker);
}

write_run_activity_marker_body_atomic(&marker_path, &serialize_run_activity_marker_record(&marker))?;

Ok(())
}

fn preserve_current_run_account_marker_fields(
current: &RunActivityMarkerRecord,
next: &mut RunActivityMarkerRecord,
) {
if current.run_id != next.run_id || current.attempt_number != next.attempt_number {
return;
}

let Some(current_account) = selected_marker_account(current).cloned() else {
return;
};
let keep_current_account = match next.account.as_ref() {
Some(next_account) =>
account_marker_observed_unix_epoch(&current_account)
> account_marker_observed_unix_epoch(next_account),
None => true,
};

if keep_current_account {
next.account = Some(current_account.clone());
next.accounts = if current.accounts.is_empty() {
vec![current_account]
} else {
current.accounts.clone()
};
} else if next.accounts.is_empty() && !current.accounts.is_empty() {
next.accounts = current.accounts.clone();
}
}

fn selected_marker_account(
marker: &RunActivityMarkerRecord,
) -> Option<&CodexAccountActivitySummary> {
marker
.account
.as_ref()
.or_else(|| {
marker.accounts.iter().find(|account| {
account.status.eq_ignore_ascii_case("selected")
})
})
.or_else(|| marker.accounts.first())
}

fn account_marker_observed_unix_epoch(account: &CodexAccountActivitySummary) -> i64 {
[account.selected_at_unix_epoch, account.checked_at_unix_epoch]
.into_iter()
.flatten()
.max()
.unwrap_or(0)
}

fn write_run_activity_marker_body_atomic(marker_path: &Path, body: &str) -> Result<()> {
let parent = marker_path.parent().ok_or_else(|| {
eyre::eyre!("activity marker path `{}` has no parent directory", marker_path.display())
Expand Down
33 changes: 33 additions & 0 deletions apps/decodex/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ fn run_activity_marker_round_trips_marker_surfaces() {
assert_run_activity_marker_round_trips_child_agent_activity_summary();
assert_run_activity_marker_round_trips_account_summary();
assert_run_activity_marker_preserves_account_summary_after_activity_refresh();
assert_run_activity_marker_preserves_account_summary_after_stale_rewrite();
}

fn assert_run_activity_marker_round_trips_clearable_auxiliary_fields() {
Expand Down Expand Up @@ -1333,6 +1334,38 @@ fn assert_run_activity_marker_preserves_account_summary_after_activity_refresh()
assert!(!leftover_temp_marker, "atomic marker rewrites should not leave temp files");
}

fn assert_run_activity_marker_preserves_account_summary_after_stale_rewrite() {
let temp_dir = TempDir::new().expect("tempdir should create");
let summary = sample_codex_account_activity_summary();

state::write_run_activity_marker_for_process(temp_dir.path(), "run-1", 1, process::id())
.expect("initial activity marker should write");

let stale_activity_marker = state::read_run_activity_marker_record(temp_dir.path())
.expect("activity marker should read")
.expect("activity marker should exist");

state::write_run_account_marker(
temp_dir.path(),
&CodexAccountMarker {
run_id: "run-1",
attempt_number: 1,
account: &summary,
accounts: slice::from_ref(&summary),
},
)
.expect("account summary should write");
state::write_run_activity_marker_record(temp_dir.path(), &stale_activity_marker)
.expect("stale activity marker rewrite should preserve current account");

let marker = state::read_run_activity_marker_snapshot(temp_dir.path())
.expect("marker snapshot should load")
.expect("marker snapshot should exist");

assert_eq!(marker.account(), Some(&summary));
assert_eq!(marker.accounts(), slice::from_ref(&summary));
}

fn sample_codex_account_activity_summary() -> CodexAccountActivitySummary {
CodexAccountActivitySummary {
account_fingerprint: String::from("acct_...cdef"),
Expand Down