11//! Thread pool for blocking operations
22
3- use crate :: loom:: sync:: { Arc , Condvar , Mutex } ;
3+ use crate :: loom:: sync:: { Arc , Mutex } ;
44use crate :: loom:: thread;
55use crate :: runtime:: blocking:: schedule:: BlockingSchedule ;
6+ use crate :: runtime:: blocking:: sharded_queue:: { ShardedQueue , WaitResult } ;
67use crate :: runtime:: blocking:: { shutdown, BlockingTask } ;
78use crate :: runtime:: builder:: ThreadNameFn ;
89use crate :: runtime:: task:: { self , JoinHandle } ;
910use crate :: runtime:: { Builder , Callback , Handle , BOX_FUTURE_THRESHOLD } ;
1011use crate :: util:: metric_atomics:: MetricAtomicUsize ;
1112use crate :: util:: trace:: { blocking_task, SpawnMeta } ;
1213
13- use std:: collections:: { HashMap , VecDeque } ;
14+ use std:: collections:: HashMap ;
1415use std:: fmt;
1516use std:: io;
1617use std:: sync:: atomic:: Ordering ;
@@ -74,11 +75,11 @@ impl SpawnerMetrics {
7475}
7576
7677struct Inner {
77- /// State shared between worker threads .
78- shared : Mutex < Shared > ,
78+ /// Sharded queue for task distribution .
79+ queue : ShardedQueue ,
7980
80- /// Pool threads wait on this .
81- condvar : Condvar ,
81+ /// State shared between worker threads (thread management only) .
82+ shared : Mutex < Shared > ,
8283
8384 /// Spawned threads use this name.
8485 thread_name : ThreadNameFn ,
@@ -103,8 +104,6 @@ struct Inner {
103104}
104105
105106struct Shared {
106- queue : VecDeque < Task > ,
107- num_notify : u32 ,
108107 shutdown : bool ,
109108 shutdown_tx : Option < shutdown:: Sender > ,
110109 /// Prior to shutdown, we clean up `JoinHandles` by having each timed-out
@@ -214,16 +213,14 @@ impl BlockingPool {
214213 BlockingPool {
215214 spawner : Spawner {
216215 inner : Arc :: new ( Inner {
216+ queue : ShardedQueue :: new ( ) ,
217217 shared : Mutex :: new ( Shared {
218- queue : VecDeque :: new ( ) ,
219- num_notify : 0 ,
220218 shutdown : false ,
221219 shutdown_tx : Some ( shutdown_tx) ,
222220 last_exiting_thread : None ,
223221 worker_threads : HashMap :: new ( ) ,
224222 worker_thread_index : 0 ,
225223 } ) ,
226- condvar : Condvar :: new ( ) ,
227224 thread_name : builder. thread_name . clone ( ) ,
228225 stack_size : builder. thread_stack_size ,
229226 after_start : builder. after_start . clone ( ) ,
@@ -253,7 +250,7 @@ impl BlockingPool {
253250
254251 shared. shutdown = true ;
255252 shared. shutdown_tx = None ;
256- self . spawner . inner . condvar . notify_all ( ) ;
253+ self . spawner . inner . queue . shutdown ( ) ;
257254
258255 let last_exited_thread = std:: mem:: take ( & mut shared. last_exiting_thread ) ;
259256 let workers = std:: mem:: take ( & mut shared. worker_threads ) ;
@@ -391,9 +388,8 @@ impl Spawner {
391388 }
392389
393390 fn spawn_task ( & self , task : Task , rt : & Handle ) -> Result < ( ) , SpawnError > {
394- let mut shared = self . inner . shared . lock ( ) ;
395-
396- if shared. shutdown {
391+ // Check shutdown without holding the lock
392+ if self . inner . queue . is_shutdown ( ) {
397393 // Shutdown the task: it's fine to shutdown this task (even if
398394 // mandatory) because it was scheduled after the shutdown of the
399395 // runtime began.
@@ -403,52 +399,61 @@ impl Spawner {
403399 return Err ( SpawnError :: ShuttingDown ) ;
404400 }
405401
406- shared. queue . push_back ( task) ;
402+ // Get thread count for adaptive sharding
403+ let num_threads = self . inner . metrics . num_threads ( ) ;
404+
405+ // Push to the sharded queue - uses adaptive shard count based on thread count
406+ self . inner . queue . push ( task, num_threads) ;
407407 self . inner . metrics . inc_queue_depth ( ) ;
408408
409+ // Check if we need to spawn a new thread or notify an idle one
409410 if self . inner . metrics . num_idle_threads ( ) == 0 {
410- // No threads are able to process the task.
411-
412- if self . inner . metrics . num_threads ( ) == self . inner . thread_cap {
413- // At max number of threads
414- } else {
415- assert ! ( shared. shutdown_tx. is_some( ) ) ;
416- let shutdown_tx = shared. shutdown_tx . clone ( ) ;
417-
418- if let Some ( shutdown_tx) = shutdown_tx {
419- let id = shared. worker_thread_index ;
411+ // No idle threads - might need to spawn one
412+ if num_threads < self . inner . thread_cap {
413+ // Try to spawn a new thread
414+ let mut shared = self . inner . shared . lock ( ) ;
415+
416+ // Double-check conditions after acquiring the lock
417+ if shared. shutdown {
418+ // Queue was shutdown while we were acquiring the lock
419+ // The task is already in the queue, so it will be drained during shutdown
420+ return Ok ( ( ) ) ;
421+ }
420422
421- match self . spawn_thread ( shutdown_tx, rt, id) {
422- Ok ( handle) => {
423- self . inner . metrics . inc_num_threads ( ) ;
424- shared. worker_thread_index += 1 ;
425- shared. worker_threads . insert ( id, handle) ;
426- }
427- Err ( ref e)
428- if is_temporary_os_thread_error ( e)
429- && self . inner . metrics . num_threads ( ) > 0 =>
430- {
431- // OS temporarily failed to spawn a new thread.
432- // The task will be picked up eventually by a currently
433- // busy thread.
434- }
435- Err ( e) => {
436- // The OS refused to spawn the thread and there is no thread
437- // to pick up the task that has just been pushed to the queue.
438- return Err ( SpawnError :: NoThreads ( e) ) ;
423+ // Re-check thread count (another thread might have spawned one)
424+ if self . inner . metrics . num_threads ( ) < self . inner . thread_cap {
425+ if let Some ( shutdown_tx) = shared. shutdown_tx . clone ( ) {
426+ let id = shared. worker_thread_index ;
427+
428+ match self . spawn_thread ( shutdown_tx, rt, id) {
429+ Ok ( handle) => {
430+ self . inner . metrics . inc_num_threads ( ) ;
431+ shared. worker_thread_index += 1 ;
432+ shared. worker_threads . insert ( id, handle) ;
433+ }
434+ Err ( ref e)
435+ if is_temporary_os_thread_error ( e)
436+ && self . inner . metrics . num_threads ( ) > 0 =>
437+ {
438+ // OS temporarily failed to spawn a new thread.
439+ // The task will be picked up eventually by a currently
440+ // busy thread.
441+ }
442+ Err ( e) => {
443+ // The OS refused to spawn the thread and there is no thread
444+ // to pick up the task that has just been pushed to the queue.
445+ return Err ( SpawnError :: NoThreads ( e) ) ;
446+ }
439447 }
440448 }
441449 }
450+ } else {
451+ // At max threads, notify anyway in case threads are waiting
452+ self . inner . queue . notify_one ( ) ;
442453 }
443454 } else {
444- // Notify an idle worker thread. The notification counter
445- // is used to count the needed amount of notifications
446- // exactly. Thread libraries may generate spurious
447- // wakeups, this counter is used to keep us in a
448- // consistent state.
449- self . inner . metrics . dec_num_idle_threads ( ) ;
450- shared. num_notify += 1 ;
451- self . inner . condvar . notify_one ( ) ;
455+ // There are idle threads waiting, notify one
456+ self . inner . queue . notify_one ( ) ;
452457 }
453458
454459 Ok ( ( ) )
@@ -505,90 +510,80 @@ impl Inner {
505510 f ( ) ;
506511 }
507512
508- let mut shared = self . shared . lock ( ) ;
513+ // Use worker_thread_id as the preferred shard
514+ let preferred_shard = worker_thread_id;
509515 let mut join_on_thread = None ;
510516
511517 ' main: loop {
512- // BUSY
513- while let Some ( task) = shared . queue . pop_front ( ) {
518+ // BUSY: Process tasks from the queue
519+ while let Some ( task) = self . queue . pop ( preferred_shard ) {
514520 self . metrics . dec_queue_depth ( ) ;
515- drop ( shared) ;
516521 task. run ( ) ;
522+ }
517523
518- shared = self . shared . lock ( ) ;
524+ // Check for shutdown before going idle
525+ if self . queue . is_shutdown ( ) {
526+ break ;
519527 }
520528
521- // IDLE
529+ // IDLE: Wait for new tasks
522530 self . metrics . inc_num_idle_threads ( ) ;
523531
524- while !shared. shutdown {
525- let lock_result = self . condvar . wait_timeout ( shared, self . keep_alive ) . unwrap ( ) ;
526-
527- shared = lock_result. 0 ;
528- let timeout_result = lock_result. 1 ;
529-
530- if shared. num_notify != 0 {
531- // We have received a legitimate wakeup,
532- // acknowledge it by decrementing the counter
533- // and transition to the BUSY state.
534- shared. num_notify -= 1 ;
535- break ;
536- }
537-
538- // Even if the condvar "timed out", if the pool is entering the
539- // shutdown phase, we want to perform the cleanup logic.
540- if !shared. shutdown && timeout_result. timed_out ( ) {
541- // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
542- // This isn't done when shutting down, because the thread calling shutdown will
543- // handle joining everything.
544- let my_handle = shared. worker_threads . remove ( & worker_thread_id) ;
545- join_on_thread = std:: mem:: replace ( & mut shared. last_exiting_thread , my_handle) ;
532+ loop {
533+ match self . queue . wait_for_task ( preferred_shard, self . keep_alive ) {
534+ WaitResult :: Task ( task) => {
535+ // Got a task, process it
536+ self . metrics . dec_num_idle_threads ( ) ;
537+ self . metrics . dec_queue_depth ( ) ;
538+ task. run ( ) ;
539+ // Go back to busy loop
540+ break ;
541+ }
542+ WaitResult :: Shutdown => {
543+ // Shutdown initiated
544+ self . metrics . dec_num_idle_threads ( ) ;
545+ break ' main;
546+ }
547+ WaitResult :: Timeout => {
548+ // Timed out, exit this thread
549+ self . metrics . dec_num_idle_threads ( ) ;
550+
551+ // Clean up thread handle
552+ let mut shared = self . shared . lock ( ) ;
553+ if !shared. shutdown {
554+ let my_handle = shared. worker_threads . remove ( & worker_thread_id) ;
555+ join_on_thread =
556+ std:: mem:: replace ( & mut shared. last_exiting_thread , my_handle) ;
557+ }
546558
547- break ' main;
559+ // Exit the main loop and terminate the thread
560+ break ' main;
561+ }
562+ WaitResult :: Spurious => {
563+ // Spurious wakeup, check for tasks and continue waiting
564+ if let Some ( task) = self . queue . pop ( preferred_shard) {
565+ self . metrics . dec_num_idle_threads ( ) ;
566+ self . metrics . dec_queue_depth ( ) ;
567+ task. run ( ) ;
568+ break ;
569+ }
570+ // Continue waiting
571+ }
548572 }
549-
550- // Spurious wakeup detected, go back to sleep.
551573 }
574+ }
552575
553- if shared. shutdown {
554- // Drain the queue
555- while let Some ( task) = shared. queue . pop_front ( ) {
556- self . metrics . dec_queue_depth ( ) ;
557- drop ( shared) ;
558-
559- task. shutdown_or_run_if_mandatory ( ) ;
560-
561- shared = self . shared . lock ( ) ;
562- }
563-
564- // Work was produced, and we "took" it (by decrementing num_notify).
565- // This means that num_idle was decremented once for our wakeup.
566- // But, since we are exiting, we need to "undo" that, as we'll stay idle.
567- self . metrics . inc_num_idle_threads ( ) ;
568- // NOTE: Technically we should also do num_notify++ and notify again,
569- // but since we're shutting down anyway, that won't be necessary.
570- break ;
571- }
576+ // Drain remaining tasks if shutting down
577+ if self . queue . is_shutdown ( ) {
578+ self . queue . drain ( |task| {
579+ self . metrics . dec_queue_depth ( ) ;
580+ task. shutdown_or_run_if_mandatory ( ) ;
581+ } ) ;
572582 }
573583
574584 // Thread exit
575585 self . metrics . dec_num_threads ( ) ;
576586
577- // num_idle should now be tracked exactly, panic
578- // with a descriptive message if it is not the
579- // case.
580- let prev_idle = self . metrics . dec_num_idle_threads ( ) ;
581- assert ! (
582- prev_idle >= self . metrics. num_idle_threads( ) ,
583- "num_idle_threads underflowed on thread exit"
584- ) ;
585-
586- if shared. shutdown && self . metrics . num_threads ( ) == 0 {
587- self . condvar . notify_one ( ) ;
588- }
589-
590- drop ( shared) ;
591-
592587 if let Some ( f) = & self . before_stop {
593588 f ( ) ;
594589 }
0 commit comments