Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/decodex/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Persistent single-machine runtime state for active Decodex execution.

#[cfg(unix)] use std::os::fd::{AsRawFd, FromRawFd};
#[cfg(unix)] use std::os::{
fd::{AsRawFd, FromRawFd},
unix::ffi::OsStrExt,
};
use std::{
cmp,
collections::{HashMap, HashSet},
Expand Down
130 changes: 128 additions & 2 deletions apps/decodex/src/state/internal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(target_os = "macos")]
use std::mem::{self, MaybeUninit};
use std::sync::atomic::AtomicU64;
use std::env;

use libc::FD_CLOEXEC;
use libc::F_GETFD;
Expand Down Expand Up @@ -52,12 +53,22 @@ struct DispatchSlotConfig {
}

struct IssueClaimGuard {
lock_path: PathBuf,
lock_file: File,
retention: GuardRetention,
}
impl IssueClaimGuard {
fn lock_root(&self) -> Result<&Path> {
lock_root_from_lock_path(&self.lock_path)
}

fn unlock(self) -> Result<()> {
self.lock_file.unlock()?;
let Self { lock_path, lock_file, retention: _ } = self;

lock_file.unlock()?;

drop(lock_file);
remove_lock_file_if_exists(&lock_path)?;

Ok(())
}
Expand All @@ -73,15 +84,31 @@ impl IssueClaimGuard {
struct DispatchSlotGuard {
project_id: String,
slot_index: usize,
lock_path: PathBuf,
lock_file: File,
retention: GuardRetention,
}
impl DispatchSlotGuard {
fn lock_root(&self) -> Result<&Path> {
lock_root_from_lock_path(&self.lock_path)
}

fn release_for_clear(self) -> Result<()> {
match self.retention {
GuardRetention::ParentAfterHandoff => Ok(()),
GuardRetention::Local | GuardRetention::AdoptingChild => {
self.lock_file.unlock()?;
let Self {
project_id: _,
slot_index: _,
lock_path,
lock_file,
retention: _,
} = self;

lock_file.unlock()?;

drop(lock_file);
remove_lock_file_if_exists(&lock_path)?;

Ok(())
},
Expand Down Expand Up @@ -2402,6 +2429,105 @@ fn issue_claim_id_from_path(path: &Path) -> Option<String> {
.map(str::to_owned)
}

fn shared_lock_coordinator_path(root: &Path) -> PathBuf {
let mut hash = 0xcbf2_9ce4_8422_2325_u64;

for byte in root.as_os_str().as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
}

env::temp_dir()
.join("decodex-shared-lock-coordinators")
.join(format!("{hash:016x}.lock"))
}

fn acquire_shared_lock_coordinator(root: &Path) -> Result<File> {
fs::create_dir_all(root)?;

let coordinator_path = shared_lock_coordinator_path(root);

if let Some(parent) = coordinator_path.parent() {
fs::create_dir_all(parent)?;
}

let coordinator = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(coordinator_path)?;

coordinator.lock()?;

Ok(coordinator)
}

fn lock_root_from_lock_path(lock_path: &Path) -> Result<&Path> {
lock_path
.parent()
.ok_or_else(|| eyre::eyre!("shared lock path `{}` has no parent root", lock_path.display()))
}

fn remove_lock_file_if_exists(path: &Path) -> Result<()> {
match fs::remove_file(path) {
Ok(()) => Ok(()),
Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
Err(error) => Err(error.into()),
}
}

fn shared_lock_file_is_cleanup_candidate(path: &Path) -> bool {
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
return false;
};

file_name.starts_with(&format!("{ISSUE_CLAIM_LOCK_FILE_PREFIX}."))
|| file_name.starts_with(&format!("{DISPATCH_SLOT_LOCK_FILE_PREFIX}."))
}

fn prune_unlocked_shared_lock_files(root: &Path) -> Result<()> {
let _coordinator = acquire_shared_lock_coordinator(root)?;
let read_dir = match fs::read_dir(root) {
Ok(read_dir) => read_dir,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(()),
Err(error) => return Err(error.into()),
};

for entry in read_dir {
let path = entry?.path();

if !shared_lock_file_is_cleanup_candidate(&path) {
continue;
}

let lock_file = match OpenOptions::new()
.read(true)
.write(true)
.create(false)
.truncate(false)
.open(&path)
{
Ok(file) => file,
Err(error) if error.kind() == ErrorKind::NotFound => continue,
Err(error) => return Err(error.into()),
};

match lock_file.try_lock() {
Ok(()) => {
lock_file.unlock()?;

drop(lock_file);
remove_lock_file_if_exists(&path)?;
},
Err(TryLockError::WouldBlock) => {},
Err(TryLockError::Error(error)) => return Err(error.into()),
}
}

Ok(())
}

fn write_issue_claim_record(
lock_file: &mut File,
project_id: &str,
Expand Down
76 changes: 63 additions & 13 deletions apps/decodex/src/state/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,18 @@ impl StateStore {
slot_limit: impl Into<DispatchSlotLimit>,
) -> Result<()> {
let slot_limit = slot_limit.into();
let worktree_root = worktree_root.as_ref().to_path_buf();
let mut state = self.lock()?;

slot_limit.validate()?;

if state.issue_claim_guards.is_empty() && state.dispatch_slot_guards.is_empty() {
prune_unlocked_shared_lock_files(&worktree_root)?;
}

state.dispatch_slot_configs.insert(
project_id.to_owned(),
DispatchSlotConfig {
root: worktree_root.as_ref().to_path_buf(),
slot_limit,
},
DispatchSlotConfig { root: worktree_root, slot_limit },
);

Ok(())
Expand Down Expand Up @@ -316,21 +319,27 @@ impl StateStore {
if let Some(dispatch_slot_config) = state.dispatch_slot_configs.get(project_id).cloned() {
fs::create_dir_all(&dispatch_slot_config.root)?;

let _coordinator = acquire_shared_lock_coordinator(&dispatch_slot_config.root)?;
let issue_claim_lock_path =
issue_claim_lock_path(&dispatch_slot_config.root, issue_id);
let issue_claim_lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(issue_claim_lock_path(&dispatch_slot_config.root, issue_id))?;
.open(&issue_claim_lock_path)?;

match issue_claim_lock_file.try_lock() {
Ok(()) => {},
Err(TryLockError::WouldBlock) => return Ok(false),
Err(TryLockError::Error(error)) => return Err(error.into()),
}

let mut issue_claim_guard =
IssueClaimGuard { lock_file: issue_claim_lock_file, retention: GuardRetention::Local };
let mut issue_claim_guard = IssueClaimGuard {
lock_path: issue_claim_lock_path,
lock_file: issue_claim_lock_file,
retention: GuardRetention::Local,
};

write_issue_claim_record(
&mut issue_claim_guard.lock_file,
Expand Down Expand Up @@ -358,18 +367,21 @@ impl StateStore {
continue;
}

let dispatch_slot_lock_path =
dispatch_slot_lock_path(&dispatch_slot_config.root, slot_index);
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(dispatch_slot_lock_path(&dispatch_slot_config.root, slot_index))?;
.open(&dispatch_slot_lock_path)?;

match lock_file.try_lock() {
Ok(()) => {
acquired_guard = Some(DispatchSlotGuard {
project_id: project_id.to_owned(),
slot_index,
lock_path: dispatch_slot_lock_path,
lock_file,
retention: GuardRetention::Local,
});
Expand Down Expand Up @@ -459,6 +471,7 @@ impl StateStore {

return Ok(leases);
};
let _coordinator = acquire_shared_lock_coordinator(&dispatch_slot_config.root)?;
let read_dir = match fs::read_dir(&dispatch_slot_config.root) {
Ok(read_dir) => read_dir,
Err(error) if error.kind() == ErrorKind::NotFound => {
Expand Down Expand Up @@ -495,7 +508,12 @@ impl StateStore {
};

match claim_lock_file.try_lock() {
Ok(()) => claim_lock_file.unlock()?,
Ok(()) => {
claim_lock_file.unlock()?;

drop(claim_lock_file);
remove_lock_file_if_exists(&path)?;
},
Err(TryLockError::WouldBlock) => {
if let Some(lease) = read_issue_claim_record(&path)?
&& lease.project_id == project_id
Expand Down Expand Up @@ -530,12 +548,13 @@ impl StateStore {
drop(state);

let path = issue_claim_lock_path(&dispatch_slot_config.root, issue_id);
let _coordinator = acquire_shared_lock_coordinator(&dispatch_slot_config.root)?;
let claim_lock_file = match OpenOptions::new()
.read(true)
.write(true)
.create(false)
.truncate(false)
.open(path)
.open(&path)
{
Ok(file) => file,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(false),
Expand All @@ -546,6 +565,9 @@ impl StateStore {
Ok(()) => {
claim_lock_file.unlock()?;

drop(claim_lock_file);
remove_lock_file_if_exists(&path)?;

Ok(false)
},
Err(TryLockError::WouldBlock) => Ok(true),
Expand All @@ -556,12 +578,22 @@ impl StateStore {
/// Remove the active lease for one issue.
pub fn clear_lease(&self, issue_id: &str) -> Result<()> {
let mut state = self.lock()?;
let _coordinator = match (
state.issue_claim_guards.get(issue_id),
state.dispatch_slot_guards.get(issue_id),
) {
(Some(guard), _) => Some(acquire_shared_lock_coordinator(guard.lock_root()?)?),
(None, Some(guard)) => Some(acquire_shared_lock_coordinator(guard.lock_root()?)?),
(None, None) => None,
};
let removed_lease = state.leases.remove(issue_id).is_some();
let issue_claim_guard = state.issue_claim_guards.remove(issue_id);
let dispatch_slot_guard = state.dispatch_slot_guards.remove(issue_id);

if let Some(guard) = state.issue_claim_guards.remove(issue_id) {
if let Some(guard) = issue_claim_guard {
guard.release_for_clear()?;
}
if let Some(guard) = state.dispatch_slot_guards.remove(issue_id) {
if let Some(guard) = dispatch_slot_guard {
guard.release_for_clear()?;
}

Expand All @@ -576,7 +608,15 @@ impl StateStore {
pub fn release_dispatch_slot(&self, issue_id: &str) -> Result<()> {
let mut state = self.lock()?;

state.dispatch_slot_guards.remove(issue_id);
if let Some(guard) = state.dispatch_slot_guards.get(issue_id) {
let _coordinator = acquire_shared_lock_coordinator(guard.lock_root()?)?;
let guard = state
.dispatch_slot_guards
.remove(issue_id)
.ok_or_else(|| eyre::eyre!("issue `{issue_id}` lost its dispatch-slot guard"))?;

guard.release_for_clear()?;
}

Ok(())
}
Expand Down Expand Up @@ -644,10 +684,16 @@ impl StateStore {
set_close_on_exec(&lock_file)?;

let mut state = self.lock()?;
let dispatch_slot_config = state
.dispatch_slot_configs
.get(project_id)
.cloned()
.ok_or_else(|| eyre::eyre!("project `{project_id}` has no shared dispatch-slot root"))?;

state.issue_claim_guards.insert(
issue_id.to_owned(),
IssueClaimGuard {
lock_path: issue_claim_lock_path(&dispatch_slot_config.root, issue_id),
lock_file: issue_claim_lock_file,
retention: GuardRetention::AdoptingChild,
},
Expand All @@ -657,6 +703,10 @@ impl StateStore {
DispatchSlotGuard {
project_id: project_id.to_owned(),
slot_index: guards.dispatch_slot_index,
lock_path: dispatch_slot_lock_path(
&dispatch_slot_config.root,
guards.dispatch_slot_index,
),
lock_file,
retention: GuardRetention::AdoptingChild,
},
Expand Down
Loading