From ecc1d2c9f641d40dc8efcb750785d9fc7771c0cd Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 2 Jun 2026 16:20:19 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Clean up shared worktree lock anchors","authority":"manual"} --- apps/decodex/src/state.rs | 5 +- apps/decodex/src/state/internal.rs | 130 ++++++++++++++++++++++++++++- apps/decodex/src/state/store.rs | 76 ++++++++++++++--- apps/decodex/src/state/tests.rs | 82 ++++++++++++++++++ 4 files changed, 277 insertions(+), 16 deletions(-) diff --git a/apps/decodex/src/state.rs b/apps/decodex/src/state.rs index 8f6d1b4..df6ea26 100644 --- a/apps/decodex/src/state.rs +++ b/apps/decodex/src/state.rs @@ -1,6 +1,9 @@ //! Persistent single-machine runtime state for active Decodex execution. -#[cfg(unix)] use std::os::fd::{AsRawFd, FromRawFd}; +#[cfg(unix)] use std::os::{ + fd::{AsRawFd, FromRawFd}, + unix::ffi::OsStrExt, +}; use std::{ cmp, collections::{HashMap, HashSet}, diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index f002e93..095ba54 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -1,6 +1,7 @@ #[cfg(target_os = "macos")] use std::mem::{self, MaybeUninit}; use std::sync::atomic::AtomicU64; +use std::env; use libc::FD_CLOEXEC; use libc::F_GETFD; @@ -52,12 +53,22 @@ struct DispatchSlotConfig { } struct IssueClaimGuard { + lock_path: PathBuf, lock_file: File, retention: GuardRetention, } impl IssueClaimGuard { + fn lock_root(&self) -> Result<&Path> { + lock_root_from_lock_path(&self.lock_path) + } + fn unlock(self) -> Result<()> { - self.lock_file.unlock()?; + let Self { lock_path, lock_file, retention: _ } = self; + + lock_file.unlock()?; + + drop(lock_file); + remove_lock_file_if_exists(&lock_path)?; Ok(()) } @@ -73,15 +84,31 @@ impl IssueClaimGuard { struct DispatchSlotGuard { project_id: String, slot_index: usize, + lock_path: PathBuf, lock_file: File, retention: GuardRetention, } impl DispatchSlotGuard { + fn lock_root(&self) -> Result<&Path> { + lock_root_from_lock_path(&self.lock_path) + } + fn release_for_clear(self) -> Result<()> { match self.retention { GuardRetention::ParentAfterHandoff => Ok(()), GuardRetention::Local | GuardRetention::AdoptingChild => { - self.lock_file.unlock()?; + let Self { + project_id: _, + slot_index: _, + lock_path, + lock_file, + retention: _, + } = self; + + lock_file.unlock()?; + + drop(lock_file); + remove_lock_file_if_exists(&lock_path)?; Ok(()) }, @@ -2402,6 +2429,105 @@ fn issue_claim_id_from_path(path: &Path) -> Option { .map(str::to_owned) } +fn shared_lock_coordinator_path(root: &Path) -> PathBuf { + let mut hash = 0xcbf2_9ce4_8422_2325_u64; + + for byte in root.as_os_str().as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x0000_0100_0000_01b3); + } + + env::temp_dir() + .join("decodex-shared-lock-coordinators") + .join(format!("{hash:016x}.lock")) +} + +fn acquire_shared_lock_coordinator(root: &Path) -> Result { + fs::create_dir_all(root)?; + + let coordinator_path = shared_lock_coordinator_path(root); + + if let Some(parent) = coordinator_path.parent() { + fs::create_dir_all(parent)?; + } + + let coordinator = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(coordinator_path)?; + + coordinator.lock()?; + + Ok(coordinator) +} + +fn lock_root_from_lock_path(lock_path: &Path) -> Result<&Path> { + lock_path + .parent() + .ok_or_else(|| eyre::eyre!("shared lock path `{}` has no parent root", lock_path.display())) +} + +fn remove_lock_file_if_exists(path: &Path) -> Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(()), + Err(error) => Err(error.into()), + } +} + +fn shared_lock_file_is_cleanup_candidate(path: &Path) -> bool { + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + return false; + }; + + file_name.starts_with(&format!("{ISSUE_CLAIM_LOCK_FILE_PREFIX}.")) + || file_name.starts_with(&format!("{DISPATCH_SLOT_LOCK_FILE_PREFIX}.")) +} + +fn prune_unlocked_shared_lock_files(root: &Path) -> Result<()> { + let _coordinator = acquire_shared_lock_coordinator(root)?; + let read_dir = match fs::read_dir(root) { + Ok(read_dir) => read_dir, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(()), + Err(error) => return Err(error.into()), + }; + + for entry in read_dir { + let path = entry?.path(); + + if !shared_lock_file_is_cleanup_candidate(&path) { + continue; + } + + let lock_file = match OpenOptions::new() + .read(true) + .write(true) + .create(false) + .truncate(false) + .open(&path) + { + Ok(file) => file, + Err(error) if error.kind() == ErrorKind::NotFound => continue, + Err(error) => return Err(error.into()), + }; + + match lock_file.try_lock() { + Ok(()) => { + lock_file.unlock()?; + + drop(lock_file); + remove_lock_file_if_exists(&path)?; + }, + Err(TryLockError::WouldBlock) => {}, + Err(TryLockError::Error(error)) => return Err(error.into()), + } + } + + Ok(()) +} + fn write_issue_claim_record( lock_file: &mut File, project_id: &str, diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 72a5c2c..6964244 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -204,15 +204,18 @@ impl StateStore { slot_limit: impl Into, ) -> Result<()> { let slot_limit = slot_limit.into(); + let worktree_root = worktree_root.as_ref().to_path_buf(); let mut state = self.lock()?; slot_limit.validate()?; + + if state.issue_claim_guards.is_empty() && state.dispatch_slot_guards.is_empty() { + prune_unlocked_shared_lock_files(&worktree_root)?; + } + state.dispatch_slot_configs.insert( project_id.to_owned(), - DispatchSlotConfig { - root: worktree_root.as_ref().to_path_buf(), - slot_limit, - }, + DispatchSlotConfig { root: worktree_root, slot_limit }, ); Ok(()) @@ -316,12 +319,15 @@ impl StateStore { if let Some(dispatch_slot_config) = state.dispatch_slot_configs.get(project_id).cloned() { fs::create_dir_all(&dispatch_slot_config.root)?; + let _coordinator = acquire_shared_lock_coordinator(&dispatch_slot_config.root)?; + let issue_claim_lock_path = + issue_claim_lock_path(&dispatch_slot_config.root, issue_id); let issue_claim_lock_file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(false) - .open(issue_claim_lock_path(&dispatch_slot_config.root, issue_id))?; + .open(&issue_claim_lock_path)?; match issue_claim_lock_file.try_lock() { Ok(()) => {}, @@ -329,8 +335,11 @@ impl StateStore { Err(TryLockError::Error(error)) => return Err(error.into()), } - let mut issue_claim_guard = - IssueClaimGuard { lock_file: issue_claim_lock_file, retention: GuardRetention::Local }; + let mut issue_claim_guard = IssueClaimGuard { + lock_path: issue_claim_lock_path, + lock_file: issue_claim_lock_file, + retention: GuardRetention::Local, + }; write_issue_claim_record( &mut issue_claim_guard.lock_file, @@ -358,18 +367,21 @@ impl StateStore { continue; } + let dispatch_slot_lock_path = + dispatch_slot_lock_path(&dispatch_slot_config.root, slot_index); let lock_file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(false) - .open(dispatch_slot_lock_path(&dispatch_slot_config.root, slot_index))?; + .open(&dispatch_slot_lock_path)?; match lock_file.try_lock() { Ok(()) => { acquired_guard = Some(DispatchSlotGuard { project_id: project_id.to_owned(), slot_index, + lock_path: dispatch_slot_lock_path, lock_file, retention: GuardRetention::Local, }); @@ -459,6 +471,7 @@ impl StateStore { return Ok(leases); }; + let _coordinator = acquire_shared_lock_coordinator(&dispatch_slot_config.root)?; let read_dir = match fs::read_dir(&dispatch_slot_config.root) { Ok(read_dir) => read_dir, Err(error) if error.kind() == ErrorKind::NotFound => { @@ -495,7 +508,12 @@ impl StateStore { }; match claim_lock_file.try_lock() { - Ok(()) => claim_lock_file.unlock()?, + Ok(()) => { + claim_lock_file.unlock()?; + + drop(claim_lock_file); + remove_lock_file_if_exists(&path)?; + }, Err(TryLockError::WouldBlock) => { if let Some(lease) = read_issue_claim_record(&path)? && lease.project_id == project_id @@ -530,12 +548,13 @@ impl StateStore { drop(state); let path = issue_claim_lock_path(&dispatch_slot_config.root, issue_id); + let _coordinator = acquire_shared_lock_coordinator(&dispatch_slot_config.root)?; let claim_lock_file = match OpenOptions::new() .read(true) .write(true) .create(false) .truncate(false) - .open(path) + .open(&path) { Ok(file) => file, Err(error) if error.kind() == ErrorKind::NotFound => return Ok(false), @@ -546,6 +565,9 @@ impl StateStore { Ok(()) => { claim_lock_file.unlock()?; + drop(claim_lock_file); + remove_lock_file_if_exists(&path)?; + Ok(false) }, Err(TryLockError::WouldBlock) => Ok(true), @@ -556,12 +578,22 @@ impl StateStore { /// Remove the active lease for one issue. pub fn clear_lease(&self, issue_id: &str) -> Result<()> { let mut state = self.lock()?; + let _coordinator = match ( + state.issue_claim_guards.get(issue_id), + state.dispatch_slot_guards.get(issue_id), + ) { + (Some(guard), _) => Some(acquire_shared_lock_coordinator(guard.lock_root()?)?), + (None, Some(guard)) => Some(acquire_shared_lock_coordinator(guard.lock_root()?)?), + (None, None) => None, + }; let removed_lease = state.leases.remove(issue_id).is_some(); + let issue_claim_guard = state.issue_claim_guards.remove(issue_id); + let dispatch_slot_guard = state.dispatch_slot_guards.remove(issue_id); - if let Some(guard) = state.issue_claim_guards.remove(issue_id) { + if let Some(guard) = issue_claim_guard { guard.release_for_clear()?; } - if let Some(guard) = state.dispatch_slot_guards.remove(issue_id) { + if let Some(guard) = dispatch_slot_guard { guard.release_for_clear()?; } @@ -576,7 +608,15 @@ impl StateStore { pub fn release_dispatch_slot(&self, issue_id: &str) -> Result<()> { let mut state = self.lock()?; - state.dispatch_slot_guards.remove(issue_id); + if let Some(guard) = state.dispatch_slot_guards.get(issue_id) { + let _coordinator = acquire_shared_lock_coordinator(guard.lock_root()?)?; + let guard = state + .dispatch_slot_guards + .remove(issue_id) + .ok_or_else(|| eyre::eyre!("issue `{issue_id}` lost its dispatch-slot guard"))?; + + guard.release_for_clear()?; + } Ok(()) } @@ -644,10 +684,16 @@ impl StateStore { set_close_on_exec(&lock_file)?; let mut state = self.lock()?; + let dispatch_slot_config = state + .dispatch_slot_configs + .get(project_id) + .cloned() + .ok_or_else(|| eyre::eyre!("project `{project_id}` has no shared dispatch-slot root"))?; state.issue_claim_guards.insert( issue_id.to_owned(), IssueClaimGuard { + lock_path: issue_claim_lock_path(&dispatch_slot_config.root, issue_id), lock_file: issue_claim_lock_file, retention: GuardRetention::AdoptingChild, }, @@ -657,6 +703,10 @@ impl StateStore { DispatchSlotGuard { project_id: project_id.to_owned(), slot_index: guards.dispatch_slot_index, + lock_path: dispatch_slot_lock_path( + &dispatch_slot_config.root, + guards.dispatch_slot_index, + ), lock_file, retention: GuardRetention::AdoptingChild, }, diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index d227efd..f9da3f8 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -807,6 +807,65 @@ fn shared_dispatch_slots_honor_configured_limit_across_process_local_stores() { ); } +#[test] +fn cleared_shared_lease_removes_lock_anchor_files() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let issue_claim_path = temp_dir.path().join(".decodex-issue-claim.PUB-101.lock"); + let dispatch_slot_path = temp_dir.path().join(".decodex-dispatch-slot.0.lock"); + let store = StateStore::open_in_memory().expect("state store should open"); + + store + .configure_dispatch_slot_root("pubfi", temp_dir.path(), 1) + .expect("store should configure dispatch slot root"); + + assert!( + store + .try_acquire_lease("pubfi", "PUB-101", "run-1", IN_PROGRESS_STATE) + .expect("shared lease acquisition should succeed") + ); + assert!(issue_claim_path.exists(), "active issue claim should create a lock anchor"); + assert!(dispatch_slot_path.exists(), "active dispatch slot should create a lock anchor"); + + store.clear_lease("PUB-101").expect("shared lease should clear"); + + assert!( + !issue_claim_path.exists(), + "clearing the shared lease should remove its issue-claim anchor" + ); + assert!( + !dispatch_slot_path.exists(), + "clearing the shared lease should remove its dispatch-slot anchor" + ); +} + +#[test] +fn configure_dispatch_slot_root_prunes_unlocked_shared_lock_files() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let stale_issue_claim_path = temp_dir.path().join(".decodex-issue-claim.PUB-999.lock"); + let stale_dispatch_slot_path = temp_dir.path().join(".decodex-dispatch-slot.0.lock"); + let store = StateStore::open_in_memory().expect("state store should open"); + + fs::write( + &stale_issue_claim_path, + "project_id=pubfi\nissue_id=PUB-999\nrun_id=run-stale\nissue_state=In Progress\n", + ) + .expect("stale issue-claim anchor should write"); + fs::write(&stale_dispatch_slot_path, "").expect("stale dispatch-slot anchor should write"); + + store + .configure_dispatch_slot_root("pubfi", temp_dir.path(), 1) + .expect("configuration should prune unlocked shared lock anchors"); + + assert!( + !stale_issue_claim_path.exists(), + "configuration should remove unlocked stale issue-claim anchors" + ); + assert!( + !stale_dispatch_slot_path.exists(), + "configuration should remove unlocked stale dispatch-slot anchors" + ); +} + #[test] fn shared_dispatch_slots_support_unlimited_across_process_local_stores() { let temp_dir = TempDir::new().expect("tempdir should create"); @@ -864,6 +923,10 @@ fn failed_shared_slot_attempt_releases_issue_claim_before_retry() { .try_acquire_lease("pubfi", "PUB-102", "run-2", IN_PROGRESS_STATE) .expect("second store should fail while the only slot is busy") ); + assert!( + !temp_dir.path().join(".decodex-issue-claim.PUB-102.lock").exists(), + "failed slot acquisition should remove its temporary issue-claim anchor" + ); store_one.clear_lease("PUB-101").expect("shared lease should clear"); @@ -1027,6 +1090,8 @@ fn adopted_dispatch_slot_blocks_after_parent_releases_local_guard() { #[test] fn adopted_issue_claim_blocks_same_issue_after_parent_clears_local_guard() { let temp_dir = TempDir::new().expect("tempdir should create"); + let issue_claim_path = temp_dir.path().join(".decodex-issue-claim.PUB-101.lock"); + let dispatch_slot_path = temp_dir.path().join(".decodex-dispatch-slot.0.lock"); let parent_store = StateStore::open_in_memory().expect("parent store should open"); let child_store = StateStore::open_in_memory().expect("child store should open"); let contender_store = StateStore::open_in_memory().expect("contender store should open"); @@ -1071,6 +1136,14 @@ fn adopted_issue_claim_blocks_same_issue_after_parent_clears_local_guard() { .clear_lease("PUB-101") .expect("parent should drop its local lease without unlocking the child handoff"); + assert!( + issue_claim_path.exists(), + "parent-side handoff cleanup must not remove the child-held issue-claim anchor" + ); + assert!( + dispatch_slot_path.exists(), + "parent-side handoff cleanup must not remove the child-held dispatch-slot anchor" + ); assert!( !contender_store .try_acquire_lease("pubfi", "PUB-101", "run-2", IN_PROGRESS_STATE) @@ -1078,6 +1151,15 @@ fn adopted_issue_claim_blocks_same_issue_after_parent_clears_local_guard() { ); child_store.clear_lease("PUB-101").expect("child lease should clear"); + + assert!( + !issue_claim_path.exists(), + "child terminal cleanup should remove the inherited issue-claim anchor" + ); + assert!( + !dispatch_slot_path.exists(), + "child terminal cleanup should remove the inherited dispatch-slot anchor" + ); } #[cfg(unix)]