From 680987a448d07750449e296ec47afac87fbb27d4 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 19 May 2026 22:44:13 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Make run activity marker writes atomic","authority":"manual"} --- apps/decodex/src/state.rs | 2 +- apps/decodex/src/state/internal.rs | 55 +++++++++++++++---- apps/decodex/src/state/tests.rs | 86 ++++++++++++++++++++++-------- 3 files changed, 112 insertions(+), 31 deletions(-) diff --git a/apps/decodex/src/state.rs b/apps/decodex/src/state.rs index 6e2c5f47..490df026 100644 --- a/apps/decodex/src/state.rs +++ b/apps/decodex/src/state.rs @@ -2,7 +2,7 @@ #[cfg(unix)] use std::os::fd::{AsRawFd, FromRawFd}; use std::{ - cmp::Ordering, + cmp, collections::{HashMap, HashSet}, fs::{self, File, OpenOptions, TryLockError}, io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}, diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index b60da18f..2794ce79 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -1,5 +1,6 @@ #[cfg(target_os = "macos")] use std::mem::MaybeUninit; +use std::sync::atomic::AtomicU64; use libc::FD_CLOEXEC; use libc::F_GETFD; @@ -13,6 +14,8 @@ use libc::{ #[cfg(target_os = "macos")] use process::Command; +static RUN_ACTIVITY_MARKER_WRITE_SEQUENCE: AtomicU64 = AtomicU64::new(0); + pub(crate) struct EffectiveRuntimeMarker<'a> { pub(crate) thread_id: Option<&'a str>, pub(crate) turn_id: Option<&'a str>, @@ -1855,7 +1858,6 @@ fn write_run_activity_marker_at( last_activity_unix_epoch: i64, last_protocol_activity_unix_epoch: Option, ) -> Result<()> { - let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); let existing_marker = read_run_activity_marker_record(worktree_path)?; let same_run_marker = existing_marker .as_ref() @@ -1873,7 +1875,7 @@ fn write_run_activity_marker_at( marker.retry_ready_at_unix_epoch = same_run_marker.retry_ready_at_unix_epoch; } - fs::write(marker_path, serialize_run_activity_marker_record(&marker))?; + write_run_activity_marker_record(worktree_path, &marker)?; Ok(()) } @@ -2006,14 +2008,49 @@ fn write_run_activity_marker_record( worktree_path: &Path, marker: &RunActivityMarkerRecord, ) -> Result<()> { - fs::write( - worktree_path.join(RUN_ACTIVITY_MARKER_FILE), - serialize_run_activity_marker_record(marker), - )?; + let marker_path = worktree_path.join(RUN_ACTIVITY_MARKER_FILE); + + write_run_activity_marker_body_atomic(&marker_path, &serialize_run_activity_marker_record(marker))?; Ok(()) } +fn write_run_activity_marker_body_atomic(marker_path: &Path, body: &str) -> Result<()> { + let parent = marker_path.parent().ok_or_else(|| { + eyre::eyre!("activity marker path `{}` has no parent directory", marker_path.display()) + })?; + let sequence = RUN_ACTIVITY_MARKER_WRITE_SEQUENCE.fetch_add( + 1, + std::sync::atomic::Ordering::Relaxed, + ); + let temp_path = parent.join(format!( + ".{RUN_ACTIVITY_MARKER_FILE}.{}.{}.tmp", + process::id(), + sequence, + )); + let result = (|| -> Result<()> { + let mut temp_file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&temp_path)?; + + temp_file.write_all(body.as_bytes())?; + temp_file.flush()?; + + drop(temp_file); + + fs::rename(&temp_path, marker_path)?; + + Ok(()) + })(); + + if result.is_err() { + let _ = fs::remove_file(&temp_path); + } + + result +} + fn serialize_run_activity_marker_record(marker: &RunActivityMarkerRecord) -> String { let mut body = String::new(); @@ -2189,7 +2226,7 @@ fn protocol_event_summary_from_events(events: &[ProtocolEventRecord]) -> Protoco summary } -fn compare_attempt_records(left: &RunAttemptRecord, right: &RunAttemptRecord) -> Ordering { +fn compare_attempt_records(left: &RunAttemptRecord, right: &RunAttemptRecord) -> cmp::Ordering { left.attempt_number .cmp(&right.attempt_number) .then_with(|| left.updated_at_unix.cmp(&right.updated_at_unix)) @@ -2199,14 +2236,14 @@ fn compare_attempt_records(left: &RunAttemptRecord, right: &RunAttemptRecord) -> fn compare_linear_execution_event_runtime_records( left: &LinearExecutionEventRuntimeRecord, right: &LinearExecutionEventRuntimeRecord, -) -> Ordering { +) -> cmp::Ordering { left.event_unix .cmp(&right.event_unix) .then_with(|| left.recorded_at_unix.cmp(&right.recorded_at_unix)) .then_with(|| left.record.idempotency_key.cmp(&right.record.idempotency_key)) } -fn compare_project_run_status(left: &ProjectRunStatus, right: &ProjectRunStatus) -> Ordering { +fn compare_project_run_status(left: &ProjectRunStatus, right: &ProjectRunStatus) -> cmp::Ordering { right .active_lease .cmp(&left.active_lease) diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 474088fd..50ca35bc 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -1032,6 +1032,7 @@ fn run_activity_marker_round_trips_marker_surfaces() { assert_run_activity_marker_round_trips_thread_and_protocol_summary_fields(); assert_run_activity_marker_round_trips_child_agent_activity_summary(); assert_run_activity_marker_round_trips_account_summary(); + assert_run_activity_marker_preserves_account_summary_after_activity_refresh(); } fn assert_run_activity_marker_round_trips_clearable_auxiliary_fields() { @@ -1264,27 +1265,7 @@ fn assert_run_activity_marker_round_trips_child_agent_activity_summary() { fn assert_run_activity_marker_round_trips_account_summary() { let temp_dir = TempDir::new().expect("tempdir should create"); - let summary = CodexAccountActivitySummary { - account_fingerprint: String::from("acct_...cdef"), - email: Some(String::from("account@example.com")), - plan_type: Some(String::from("pro")), - status: String::from("selected"), - refresh_status: String::from("not_needed"), - checked_at_unix_epoch: Some(1_800_000_010), - selected_at_unix_epoch: Some(1_800_000_011), - primary_window_seconds: Some(18_000), - primary_remaining_percent: Some(72), - primary_resets_at_unix_epoch: Some(1_800_018_000), - secondary_window_seconds: Some(604_800), - secondary_remaining_percent: Some(91), - secondary_resets_at_unix_epoch: Some(1_800_604_800), - credits_has_credits: Some(true), - credits_unlimited: Some(false), - credits_balance: Some(String::from("9.99")), - rate_limit_reached_type: None, - cooldown_until_unix_epoch: None, - note: Some(String::from("usage probe ok")), - }; + let summary = sample_codex_account_activity_summary(); state::write_run_account_marker( temp_dir.path(), @@ -1313,6 +1294,69 @@ fn assert_run_activity_marker_round_trips_account_summary() { assert!(!body.contains("codex_accounts=")); } +fn assert_run_activity_marker_preserves_account_summary_after_activity_refresh() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let summary = sample_codex_account_activity_summary(); + + state::write_run_account_marker( + temp_dir.path(), + &CodexAccountMarker { + run_id: "run-1", + attempt_number: 1, + account: &summary, + accounts: slice::from_ref(&summary), + }, + ) + .expect("account summary should write"); + state::write_run_activity_marker_at( + temp_dir.path(), + "run-1", + 1, + process::id(), + 1_800_000_020, + Some(1_800_000_019), + ) + .expect("activity refresh should write"); + + let marker = state::read_run_activity_marker_snapshot(temp_dir.path()) + .expect("marker snapshot should load") + .expect("marker snapshot should exist"); + + assert_eq!(marker.account(), Some(&summary)); + assert_eq!(marker.accounts(), slice::from_ref(&summary)); + + let leftover_temp_marker = fs::read_dir(temp_dir.path()) + .expect("tempdir should be readable") + .filter_map(|entry| entry.ok()) + .any(|entry| entry.file_name().to_string_lossy().contains(".decodex-run-activity.")); + + assert!(!leftover_temp_marker, "atomic marker rewrites should not leave temp files"); +} + +fn sample_codex_account_activity_summary() -> CodexAccountActivitySummary { + CodexAccountActivitySummary { + account_fingerprint: String::from("acct_...cdef"), + email: Some(String::from("account@example.com")), + plan_type: Some(String::from("pro")), + status: String::from("selected"), + refresh_status: String::from("not_needed"), + checked_at_unix_epoch: Some(1_800_000_010), + selected_at_unix_epoch: Some(1_800_000_011), + primary_window_seconds: Some(18_000), + primary_remaining_percent: Some(72), + primary_resets_at_unix_epoch: Some(1_800_018_000), + secondary_window_seconds: Some(604_800), + secondary_remaining_percent: Some(91), + secondary_resets_at_unix_epoch: Some(1_800_604_800), + credits_has_credits: Some(true), + credits_unlimited: Some(false), + credits_balance: Some(String::from("9.99")), + rate_limit_reached_type: None, + cooldown_until_unix_epoch: None, + note: Some(String::from("usage probe ok")), + } +} + #[test] fn run_operation_marker_resets_stale_per_attempt_fields_on_new_attempt() { let temp_dir = TempDir::new().expect("tempdir should create");