From 455f5d59af6fb3574fb34f2747efc16b7bac646c Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 1 Sep 2025 11:46:53 -0400 Subject: [PATCH] Purge caches and build directories when nothing is running This prevents deleting files that Cargo is trying to read out from under it. --- src/runner/mod.rs | 13 +++- src/runner/worker.rs | 133 +++++++++++++++++++++++++++----------- src/runner/worker/test.rs | 89 +++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 40 deletions(-) create mode 100644 src/runner/worker/test.rs diff --git a/src/runner/mod.rs b/src/runner/mod.rs index d1b6f46e..afdc2e53 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -10,6 +10,7 @@ use crate::prelude::*; use crate::results::TestResult; use crate::runner::worker::{DiskSpaceWatcher, Worker}; use rustwide::Workspace; +use std::sync::Arc; use std::thread::scope; use std::time::Duration; pub use worker::RecordProgress; @@ -95,9 +96,19 @@ pub fn run_ex( let disk_watcher = DiskSpaceWatcher::new( DISK_SPACE_WATCHER_INTERVAL, DISK_SPACE_WATCHER_THRESHOLD, - &workers, + workers.len(), ); + for worker in workers.iter() { + let disk_watcher = Arc::clone(&disk_watcher); + assert!(worker + .between_crates + .set(Box::new(move |is_permanent| { + disk_watcher.worker_idle(is_permanent); + })) + .is_ok()); + } + scope(|scope1| { std::thread::Builder::new() .name("disk-space-watcher".into()) diff --git a/src/runner/worker.rs b/src/runner/worker.rs index e07b1c52..8019c21c 100644 --- a/src/runner/worker.rs +++ b/src/runner/worker.rs @@ -11,11 +11,8 @@ use crate::utils; use rustwide::logging::{self, LogStorage}; use rustwide::{BuildDirectory, Workspace}; use std::collections::HashMap; -use std::sync::Condvar; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Mutex, -}; +use std::sync::Mutex; +use std::sync::{Arc, Condvar, OnceLock}; use std::time::Duration; pub trait RecordProgress: Send + Sync { @@ -51,8 +48,10 @@ pub(super) struct Worker<'a> { ex: &'a Experiment, config: &'a crate::config::Config, api: &'a dyn RecordProgress, - target_dir_cleanup: AtomicBool, next_crate: &'a (dyn Fn() -> Fallible> + Send + Sync), + + // Called by the worker thread between crates, when no global state (namely caches) is in use. + pub(super) between_crates: OnceLock>, } impl<'a> Worker<'a> { @@ -81,7 +80,8 @@ impl<'a> Worker<'a> { config, next_crate, api, - target_dir_cleanup: AtomicBool::new(false), + + between_crates: OnceLock::new(), } } @@ -163,14 +163,21 @@ impl<'a> Worker<'a> { // Backoff from calling the server again, to reduce load when we're spinning until // the next experiment is ready. std::thread::sleep(Duration::from_secs(rand::random_range(60..120))); + + if let Some(cb) = self.between_crates.get() { + cb(true); + } + // We're done if no more crates left. return Ok(()); }; - self.maybe_cleanup_target_dir()?; - info!("{} processing crate {}", self.name, krate); + if let Some(cb) = self.between_crates.get() { + cb(false); + } + if !self.ex.ignore_blacklist && self.config.should_skip(&krate) { for tc in &self.ex.toolchains { // If a skipped crate is somehow sent to the agent (for example, when a crate was @@ -338,41 +345,36 @@ impl<'a> Worker<'a> { } } } - - fn maybe_cleanup_target_dir(&self) -> Fallible<()> { - if !self.target_dir_cleanup.swap(false, Ordering::SeqCst) { - return Ok(()); - } - info!("purging target dir for {}", self.name); - for dir in self.build_dir.values() { - dir.lock().unwrap().purge()?; - } - - Ok(()) - } - - fn schedule_target_dir_cleanup(&self) { - self.target_dir_cleanup.store(true, Ordering::SeqCst); - } } -pub(super) struct DiskSpaceWatcher<'a> { +pub(super) struct DiskSpaceWatcher { interval: Duration, threshold: f32, - workers: &'a [Worker<'a>], should_stop: Mutex, waiter: Condvar, + + worker_count: usize, + + // If the bool is true, that means we're waiting for the cache to reach zero, in which case + // workers will wait for it to be false before starting. This gives us a global 'is the cache + // in use' synchronization point. + cache_in_use: Mutex<(usize, bool)>, + cache_waiter: Condvar, } -impl<'a> DiskSpaceWatcher<'a> { - pub(super) fn new(interval: Duration, threshold: f32, workers: &'a [Worker<'a>]) -> Self { - DiskSpaceWatcher { +impl DiskSpaceWatcher { + pub(super) fn new(interval: Duration, threshold: f32, worker_count: usize) -> Arc { + Arc::new(DiskSpaceWatcher { interval, threshold, - workers, should_stop: Mutex::new(false), waiter: Condvar::new(), - } + + worker_count, + + cache_in_use: Mutex::new((0, false)), + cache_waiter: Condvar::new(), + }) } pub(super) fn stop(&self) { @@ -406,14 +408,67 @@ impl<'a> DiskSpaceWatcher<'a> { }; if usage.is_threshold_reached(self.threshold) { - warn!("running the scheduled thread cleanup"); - for worker in self.workers { - worker.schedule_target_dir_cleanup(); - } + self.clean(workspace); + } + } - if let Err(e) = workspace.purge_all_caches() { - warn!("failed to purge caches: {:?}", e); - } + fn clean(&self, workspace: &dyn ToClean) { + warn!("declaring interest in worker idle"); + + // Set interest in cleaning caches and then wait for cache use to drain to zero. + let mut guard = self.cache_in_use.lock().unwrap(); + guard.1 = true; + + self.cache_waiter.notify_all(); + + warn!("declared interest in workers, waiting for everyone to idle"); + + let mut guard = self + .cache_waiter + .wait_while(guard, |c| c.0 != self.worker_count) + .unwrap(); + + // OK, purging caches, clear interest. + guard.1 = false; + + self.cache_waiter.notify_all(); + + warn!("purging all build dirs and caches"); + + workspace.purge(); + } + + pub(super) fn worker_idle(&self, permanent: bool) { + log::trace!("worker at idle point"); + let mut guard = self.cache_in_use.lock().unwrap(); + log::trace!("worker declared idle"); + // note that we're not running right now. + guard.0 += 1; + self.cache_waiter.notify_all(); + if !permanent { + let mut guard = self.cache_waiter.wait_while(guard, |c| c.1).unwrap(); + // Then set ourselves as running. + guard.0 -= 1; + self.cache_waiter.notify_all(); + } + } +} + +trait ToClean { + fn purge(&self); +} + +impl ToClean for Workspace { + fn purge(&self) { + if let Err(e) = self.purge_all_caches() { + warn!("failed to purge caches: {:?}", e); + } + + if let Err(e) = self.purge_all_build_dirs() { + warn!("failed to purge build directories: {:?}", e); } } } + +#[cfg(test)] +mod test; diff --git a/src/runner/worker/test.rs b/src/runner/worker/test.rs new file mode 100644 index 00000000..17362931 --- /dev/null +++ b/src/runner/worker/test.rs @@ -0,0 +1,89 @@ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Condvar; +use std::sync::Mutex; +use std::time::Duration; + +use crate::runner::DiskSpaceWatcher; + +#[derive(Default, Debug)] +struct PurgeTracker { + count: Mutex, + wait: Condvar, +} + +impl PurgeTracker { + #[track_caller] + fn assert_count_eq(&self, count: usize) { + let guard = self.count.lock().unwrap(); + let (g, timer) = self + .wait + .wait_timeout_while(guard, Duration::from_secs(10), |g| *g != count) + .unwrap(); + assert!( + !timer.timed_out(), + "timed out while waiting for {} to equal {}", + *g, + count + ); + assert_eq!(*g, count); + } +} + +impl super::ToClean for PurgeTracker { + fn purge(&self) { + *self.count.lock().unwrap() += 1; + self.wait.notify_all(); + } +} + +#[test] +fn check_cleanup_single_worker() { + let _ = env_logger::try_init(); + let tracker = Arc::new(PurgeTracker::default()); + let watcher = DiskSpaceWatcher::new(Duration::from_secs(60), 0.8, 1); + let done = &AtomicBool::new(false); + std::thread::scope(|s| { + s.spawn(|| { + for _ in 0..3 { + watcher.clean(&*tracker); + } + done.store(true, Ordering::Relaxed); + }); + + s.spawn(|| { + while !done.load(Ordering::Relaxed) { + watcher.worker_idle(false); + } + }); + }); + + tracker.assert_count_eq(3); +} + +#[test] +fn check_cleanup_multi_worker() { + let _ = env_logger::try_init(); + let tracker = Arc::new(PurgeTracker::default()); + let watcher = DiskSpaceWatcher::new(Duration::from_secs(60), 0.8, 3); + let done = &AtomicBool::new(false); + std::thread::scope(|s| { + s.spawn(|| { + for _ in 0..5 { + watcher.clean(&*tracker); + } + done.store(true, Ordering::Relaxed); + }); + + for _ in 0..3 { + s.spawn(|| { + while !done.load(Ordering::Relaxed) { + watcher.worker_idle(false); + } + }); + } + }); + + tracker.assert_count_eq(5); +}