Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
311
312
&
+
<
Expand Down Expand Up @@ -32,6 +32,7 @@
8MB
ABI
accessors
adaptively
adaptor
adaptors
Adaptors
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};

mod sharded_queue;

cfg_fs! {
pub(crate) use pool::spawn_mandatory_blocking;
}
Expand Down
229 changes: 112 additions & 117 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//! Thread pool for blocking operations

use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::BlockingSchedule;
use crate::runtime::blocking::sharded_queue::{ShardedQueue, WaitResult};
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD};
use crate::util::metric_atomics::MetricAtomicUsize;
use crate::util::trace::{blocking_task, SpawnMeta};

use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -74,11 +75,11 @@ impl SpawnerMetrics {
}

struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
/// Sharded queue for task distribution.
queue: ShardedQueue,

/// Pool threads wait on this.
condvar: Condvar,
/// State shared between worker threads (thread management only).
shared: Mutex<Shared>,

/// Spawned threads use this name.
thread_name: ThreadNameFn,
Expand All @@ -103,8 +104,6 @@ struct Inner {
}

struct Shared {
queue: VecDeque<Task>,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
/// Prior to shutdown, we clean up `JoinHandles` by having each timed-out
Expand Down Expand Up @@ -214,16 +213,14 @@ impl BlockingPool {
BlockingPool {
spawner: Spawner {
inner: Arc::new(Inner {
queue: ShardedQueue::new(),
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
last_exiting_thread: None,
worker_threads: HashMap::new(),
worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
Expand Down Expand Up @@ -253,7 +250,7 @@ impl BlockingPool {

shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
self.spawner.inner.queue.shutdown();

let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
let workers = std::mem::take(&mut shared.worker_threads);
Expand Down Expand Up @@ -391,9 +388,8 @@ impl Spawner {
}

fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();

if shared.shutdown {
// Check shutdown without holding the lock
if self.inner.queue.is_shutdown() {
// Shutdown the task: it's fine to shutdown this task (even if
// mandatory) because it was scheduled after the shutdown of the
// runtime began.
Expand All @@ -403,52 +399,61 @@ impl Spawner {
return Err(SpawnError::ShuttingDown);
}

shared.queue.push_back(task);
// Get thread count for adaptive sharding
let num_threads = self.inner.metrics.num_threads();

// Push to the sharded queue - uses adaptive shard count based on thread count
self.inner.queue.push(task, num_threads);
self.inner.metrics.inc_queue_depth();

// Check if we need to spawn a new thread or notify an idle one
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.

if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
assert!(shared.shutdown_tx.is_some());
let shutdown_tx = shared.shutdown_tx.clone();

if let Some(shutdown_tx) = shutdown_tx {
let id = shared.worker_thread_index;
// No idle threads - might need to spawn one
if num_threads < self.inner.thread_cap {
// Try to spawn a new thread
let mut shared = self.inner.shared.lock();

// Double-check conditions after acquiring the lock
if shared.shutdown {
// Queue was shutdown while we were acquiring the lock
// The task is already in the queue, so it will be drained during shutdown
return Ok(());
}

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
}
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
return Err(SpawnError::NoThreads(e));
// Re-check thread count (another thread might have spawned one)
if self.inner.metrics.num_threads() < self.inner.thread_cap {
if let Some(shutdown_tx) = shared.shutdown_tx.clone() {
let id = shared.worker_thread_index;

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
}
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
return Err(SpawnError::NoThreads(e));
}
}
}
}
} else {
// At max threads, notify anyway in case threads are waiting
self.inner.queue.notify_one();
}
} else {
// Notify an idle worker thread. The notification counter
// is used to count the needed amount of notifications
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
// There are idle threads waiting, notify one
self.inner.queue.notify_one();
}

Ok(())
Expand Down Expand Up @@ -505,90 +510,80 @@ impl Inner {
f();
}

let mut shared = self.shared.lock();
// Use worker_thread_id as the preferred shard
let preferred_shard = worker_thread_id;
let mut join_on_thread = None;

'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
// BUSY: Process tasks from the queue
while let Some(task) = self.queue.pop(preferred_shard) {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
}

shared = self.shared.lock();
// Check for shutdown before going idle
if self.queue.is_shutdown() {
break;
}

// IDLE
// IDLE: Wait for new tasks
self.metrics.inc_num_idle_threads();

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

shared = lock_result.0;
let timeout_result = lock_result.1;

if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}

// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
loop {
match self.queue.wait_for_task(preferred_shard, self.keep_alive) {
WaitResult::Task(task) => {
// Got a task, process it
self.metrics.dec_num_idle_threads();
self.metrics.dec_queue_depth();
task.run();
// Go back to busy loop
break;
}
WaitResult::Shutdown => {
// Shutdown initiated
self.metrics.dec_num_idle_threads();
break 'main;
}
WaitResult::Timeout => {
// Timed out, exit this thread
self.metrics.dec_num_idle_threads();

// Clean up thread handle
let mut shared = self.shared.lock();
if !shared.shutdown {
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread =
std::mem::replace(&mut shared.last_exiting_thread, my_handle);
}

break 'main;
// Exit the main loop and terminate the thread
break 'main;
}
WaitResult::Spurious => {
// Spurious wakeup, check for tasks and continue waiting
if let Some(task) = self.queue.pop(preferred_shard) {
self.metrics.dec_num_idle_threads();
self.metrics.dec_queue_depth();
task.run();
break;
}
// Continue waiting
}
}

// Spurious wakeup detected, go back to sleep.
}
}

if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

task.shutdown_or_run_if_mandatory();

shared = self.shared.lock();
}

// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}
// Drain remaining tasks if shutting down
if self.queue.is_shutdown() {
self.queue.drain(|task| {
self.metrics.dec_queue_depth();
task.shutdown_or_run_if_mandatory();
});
}

// Thread exit
self.metrics.dec_num_threads();

// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
let prev_idle = self.metrics.dec_num_idle_threads();
assert!(
prev_idle >= self.metrics.num_idle_threads(),
"num_idle_threads underflowed on thread exit"
);

if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}

drop(shared);

if let Some(f) = &self.before_stop {
f();
}
Expand Down
Loading