Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::prelude::*;
use crate::results::TestResult;
use crate::runner::worker::{DiskSpaceWatcher, Worker};
use rustwide::Workspace;
use std::sync::Arc;
use std::thread::scope;
use std::time::Duration;
pub use worker::RecordProgress;
Expand Down Expand Up @@ -95,9 +96,19 @@ pub fn run_ex(
let disk_watcher = DiskSpaceWatcher::new(
DISK_SPACE_WATCHER_INTERVAL,
DISK_SPACE_WATCHER_THRESHOLD,
&workers,
workers.len(),
);

for worker in workers.iter() {
let disk_watcher = Arc::clone(&disk_watcher);
assert!(worker
.between_crates
.set(Box::new(move |is_permanent| {
disk_watcher.worker_idle(is_permanent);
}))
.is_ok());
}

scope(|scope1| {
std::thread::Builder::new()
.name("disk-space-watcher".into())
Expand Down
133 changes: 94 additions & 39 deletions src/runner/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use crate::utils;
use rustwide::logging::{self, LogStorage};
use rustwide::{BuildDirectory, Workspace};
use std::collections::HashMap;
use std::sync::Condvar;
use std::sync::{
atomic::{AtomicBool, Ordering},
Mutex,
};
use std::sync::Mutex;
use std::sync::{Arc, Condvar, OnceLock};
use std::time::Duration;

pub trait RecordProgress: Send + Sync {
Expand Down Expand Up @@ -51,8 +48,10 @@ pub(super) struct Worker<'a> {
ex: &'a Experiment,
config: &'a crate::config::Config,
api: &'a dyn RecordProgress,
target_dir_cleanup: AtomicBool,
next_crate: &'a (dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),

// Called by the worker thread between crates, when no global state (namely caches) is in use.
pub(super) between_crates: OnceLock<Box<dyn Fn(bool) + Send + Sync + 'a>>,
}

impl<'a> Worker<'a> {
Expand Down Expand Up @@ -81,7 +80,8 @@ impl<'a> Worker<'a> {
config,
next_crate,
api,
target_dir_cleanup: AtomicBool::new(false),

between_crates: OnceLock::new(),
}
}

Expand Down Expand Up @@ -163,14 +163,21 @@ impl<'a> Worker<'a> {
// Backoff from calling the server again, to reduce load when we're spinning until
// the next experiment is ready.
std::thread::sleep(Duration::from_secs(rand::random_range(60..120)));

if let Some(cb) = self.between_crates.get() {
cb(true);
}

// We're done if no more crates left.
return Ok(());
};

self.maybe_cleanup_target_dir()?;

info!("{} processing crate {}", self.name, krate);

if let Some(cb) = self.between_crates.get() {
cb(false);
}

if !self.ex.ignore_blacklist && self.config.should_skip(&krate) {
for tc in &self.ex.toolchains {
// If a skipped crate is somehow sent to the agent (for example, when a crate was
Expand Down Expand Up @@ -338,41 +345,36 @@ impl<'a> Worker<'a> {
}
}
}

fn maybe_cleanup_target_dir(&self) -> Fallible<()> {
if !self.target_dir_cleanup.swap(false, Ordering::SeqCst) {
return Ok(());
}
info!("purging target dir for {}", self.name);
for dir in self.build_dir.values() {
dir.lock().unwrap().purge()?;
}

Ok(())
}

fn schedule_target_dir_cleanup(&self) {
self.target_dir_cleanup.store(true, Ordering::SeqCst);
}
}

pub(super) struct DiskSpaceWatcher<'a> {
pub(super) struct DiskSpaceWatcher {
interval: Duration,
threshold: f32,
workers: &'a [Worker<'a>],
should_stop: Mutex<bool>,
waiter: Condvar,

worker_count: usize,

// If the bool is true, that means we're waiting for the cache to reach zero, in which case
// workers will wait for it to be false before starting. This gives us a global 'is the cache
// in use' synchronization point.
cache_in_use: Mutex<(usize, bool)>,
cache_waiter: Condvar,
}

impl<'a> DiskSpaceWatcher<'a> {
pub(super) fn new(interval: Duration, threshold: f32, workers: &'a [Worker<'a>]) -> Self {
DiskSpaceWatcher {
impl DiskSpaceWatcher {
pub(super) fn new(interval: Duration, threshold: f32, worker_count: usize) -> Arc<Self> {
Arc::new(DiskSpaceWatcher {
interval,
threshold,
workers,
should_stop: Mutex::new(false),
waiter: Condvar::new(),
}

worker_count,

cache_in_use: Mutex::new((0, false)),
cache_waiter: Condvar::new(),
})
}

pub(super) fn stop(&self) {
Expand Down Expand Up @@ -406,14 +408,67 @@ impl<'a> DiskSpaceWatcher<'a> {
};

if usage.is_threshold_reached(self.threshold) {
warn!("running the scheduled thread cleanup");
for worker in self.workers {
worker.schedule_target_dir_cleanup();
}
self.clean(workspace);
}
}

if let Err(e) = workspace.purge_all_caches() {
warn!("failed to purge caches: {:?}", e);
}
fn clean(&self, workspace: &dyn ToClean) {
warn!("declaring interest in worker idle");

// Set interest in cleaning caches and then wait for cache use to drain to zero.
let mut guard = self.cache_in_use.lock().unwrap();
guard.1 = true;

self.cache_waiter.notify_all();

warn!("declared interest in workers, waiting for everyone to idle");

let mut guard = self
.cache_waiter
.wait_while(guard, |c| c.0 != self.worker_count)
.unwrap();

// OK, purging caches, clear interest.
guard.1 = false;

self.cache_waiter.notify_all();

warn!("purging all build dirs and caches");

workspace.purge();
}

pub(super) fn worker_idle(&self, permanent: bool) {
log::trace!("worker at idle point");
let mut guard = self.cache_in_use.lock().unwrap();
log::trace!("worker declared idle");
// note that we're not running right now.
guard.0 += 1;
self.cache_waiter.notify_all();
if !permanent {
let mut guard = self.cache_waiter.wait_while(guard, |c| c.1).unwrap();
// Then set ourselves as running.
guard.0 -= 1;
self.cache_waiter.notify_all();
}
}
}

trait ToClean {
fn purge(&self);
}

impl ToClean for Workspace {
fn purge(&self) {
if let Err(e) = self.purge_all_caches() {
warn!("failed to purge caches: {:?}", e);
}

if let Err(e) = self.purge_all_build_dirs() {
warn!("failed to purge build directories: {:?}", e);
}
}
}

#[cfg(test)]
mod test;
89 changes: 89 additions & 0 deletions src/runner/worker/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::time::Duration;

use crate::runner::DiskSpaceWatcher;

#[derive(Default, Debug)]
struct PurgeTracker {
count: Mutex<usize>,
wait: Condvar,
}

impl PurgeTracker {
#[track_caller]
fn assert_count_eq(&self, count: usize) {
let guard = self.count.lock().unwrap();
let (g, timer) = self
.wait
.wait_timeout_while(guard, Duration::from_secs(10), |g| *g != count)
.unwrap();
assert!(
!timer.timed_out(),
"timed out while waiting for {} to equal {}",
*g,
count
);
assert_eq!(*g, count);
}
}

impl super::ToClean for PurgeTracker {
fn purge(&self) {
*self.count.lock().unwrap() += 1;
self.wait.notify_all();
}
}

#[test]
fn check_cleanup_single_worker() {
let _ = env_logger::try_init();
let tracker = Arc::new(PurgeTracker::default());
let watcher = DiskSpaceWatcher::new(Duration::from_secs(60), 0.8, 1);
let done = &AtomicBool::new(false);
std::thread::scope(|s| {
s.spawn(|| {
for _ in 0..3 {
watcher.clean(&*tracker);
}
done.store(true, Ordering::Relaxed);
});

s.spawn(|| {
while !done.load(Ordering::Relaxed) {
watcher.worker_idle(false);
}
});
});

tracker.assert_count_eq(3);
}

#[test]
fn check_cleanup_multi_worker() {
let _ = env_logger::try_init();
let tracker = Arc::new(PurgeTracker::default());
let watcher = DiskSpaceWatcher::new(Duration::from_secs(60), 0.8, 3);
let done = &AtomicBool::new(false);
std::thread::scope(|s| {
s.spawn(|| {
for _ in 0..5 {
watcher.clean(&*tracker);
}
done.store(true, Ordering::Relaxed);
});

for _ in 0..3 {
s.spawn(|| {
while !done.load(Ordering::Relaxed) {
watcher.worker_idle(false);
}
});
}
});

tracker.assert_count_eq(5);
}