diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index b698dc9df0..e47663d236 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -194,7 +194,14 @@ impl PoolInner { permit: AsyncSemaphoreReleaser<'a>, ) -> Result>, AsyncSemaphoreReleaser<'a>> { if let Some(idle) = self.idle_conns.pop() { - self.num_idle.fetch_sub(1, Ordering::AcqRel); + // Saturating: never underflow even if a concurrent `release` hasn't yet published + // its increment. An underflow would wrap `num_idle` to `usize::MAX` and wedge the + // maintenance task in a non-yielding spin (see `release` for the full invariant). + let _ = self + .num_idle + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| { + Some(n.saturating_sub(1)) + }); Ok(Floating::from_idle(idle, (*self).clone(), permit)) } else { Err(permit) @@ -206,6 +213,14 @@ impl PoolInner { let Floating { inner: idle, guard } = floating.into_idle(); + // Bump the idle counter *before* the connection becomes acquirable, so a concurrent + // `pop_idle` can never observe a popped connection without a matching increment. + // (Otherwise `num_idle.fetch_sub` can underflow a `usize` to `usize::MAX`, which makes + // the maintenance task's `for _ in 0..num_idle()` loop spin ~forever, pegging a CPU.) + // Over-counting transiently (incremented, not yet pushed) is harmless: `pop_idle` + // simply finds an empty queue and returns the permit without decrementing. + self.num_idle.fetch_add(1, Ordering::AcqRel); + if self.idle_conns.push(idle).is_err() { panic!("BUG: connection queue overflow in release()"); } @@ -213,8 +228,6 @@ impl PoolInner { // NOTE: we need to make sure we drop the permit *after* we push to the idle queue // don't decrease the size guard.release_permit(); - - self.num_idle.fetch_add(1, Ordering::AcqRel); } /// Try to atomically increment the pool size for a new connection. @@ -621,3 +634,179 @@ impl Drop for DecrementSizeGuard { } } } + +// Uses the in-crate `Any` database with a stub backend so we can drive `PoolInner` internals +// directly. (We can't use a real driver here: the only `Database` impls outside this crate +// would be a different `sqlx-core` instance via the dev-dependency cycle.) +#[cfg(all(test, feature = "any"))] +mod underflow_tests { + use super::*; + use crate::any::{ + Any, AnyArguments, AnyConnectOptions, AnyConnection, AnyConnectionBackend, AnyQueryResult, + AnyRow, AnyStatement, AnyTypeInfo, + }; + use crate::pool::Pool; + use crate::sql_str::SqlStr; + use either::Either; + use futures_core::future::BoxFuture; + use futures_core::stream::BoxStream; + use std::str::FromStr; + + /// A backend that constructs but never executes anything. The pool's `release`/`pop_idle` + /// paths only move the opaque connection around — they never call any of these methods. + #[derive(Debug)] + struct StubBackend; + + impl AnyConnectionBackend for StubBackend { + fn name(&self) -> &str { + "stub" + } + fn close(self: Box) -> BoxFuture<'static, crate::Result<()>> { + unimplemented!() + } + fn close_hard(self: Box) -> BoxFuture<'static, crate::Result<()>> { + unimplemented!() + } + fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn begin(&mut self, _statement: Option) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn commit(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn rollback(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn start_rollback(&mut self) { + unimplemented!() + } + fn shrink_buffers(&mut self) { + unimplemented!() + } + fn flush(&mut self) -> BoxFuture<'_, crate::Result<()>> { + unimplemented!() + } + fn should_flush(&self) -> bool { + unimplemented!() + } + fn fetch_many( + &mut self, + _query: SqlStr, + _persistent: bool, + _arguments: Option, + ) -> BoxStream<'_, crate::Result>> { + unimplemented!() + } + fn fetch_optional( + &mut self, + _query: SqlStr, + _persistent: bool, + _arguments: Option, + ) -> BoxFuture<'_, crate::Result>> { + unimplemented!() + } + fn prepare_with<'c, 'q: 'c>( + &'c mut self, + _sql: SqlStr, + _parameters: &[AnyTypeInfo], + ) -> BoxFuture<'c, crate::Result> { + unimplemented!() + } + #[cfg(feature = "offline")] + fn describe( + &mut self, + _sql: SqlStr, + ) -> BoxFuture<'_, crate::Result>> { + unimplemented!() + } + } + + fn make_live(inner: &Arc>) -> Floating> { + // Mint a live connection like `acquire` would: take a size slot + a permit, wrap a + // stub connection. The pool's `release`/`try_acquire` never touch the connection + // itself, so a stub backend that panics on use is fine. + inner.size.fetch_add(1, Ordering::AcqRel); + let permit = inner + .semaphore + .try_acquire(1) + .expect("a permit should be available"); + let guard = DecrementSizeGuard::from_permit(Arc::clone(inner), permit); + let conn = AnyConnection { + backend: Box::new(StubBackend), + }; + Floating::new_live(conn, guard) + } + + // Stress test for the `num_idle` underflow race, exercised entirely through the real + // `try_acquire` and `release` paths. + // + // The bug: `release` published a returned connection (push to the idle queue + release its + // semaphore permit) *before* incrementing `num_idle`. A concurrent `try_acquire` could pop + // that connection and run its `pop_idle` decrement while `num_idle` was still 0, wrapping + // the `usize` to `usize::MAX`. Downstream, the maintenance task's `for _ in 0..num_idle()` + // sweep would then spin ~forever on a synchronous body, pinning a worker thread (the + // reported 100%-CPU-on-shutdown symptom). + // + // Many threads hammer acquire+release on a small, heavily oversubscribed pool, so the idle + // count is constantly driven to 0 right as another thread is publishing a connection — + // exactly the interleaving that underflows. Every thread samples `num_idle()` after each + // op and records the largest value ever seen; an underflow shows up as a value far above + // `max_connections`. This requires real OS threads: the race window in `release` has no + // `.await`, so it cannot interleave on a single-threaded executor. + #[test] + fn release_try_acquire_stress_never_underflows_num_idle() { + use std::sync::atomic::AtomicUsize; + use std::thread; + + const MAX_CONNS: u32 = 3; + const WORKERS: usize = 12; + const ITERS: usize = 200_000; + + let opts = AnyConnectOptions::from_str("stub::memory:").expect("parse url"); + let pool: Pool = PoolOptions::new() + .max_connections(MAX_CONNS) + .min_connections(0) + .connect_lazy_with(opts); + let inner = Arc::clone(&pool.0); + + // Pre-fill the pool with `MAX_CONNS` idle connections via the real `release` path. + for _ in 0..MAX_CONNS { + inner.release(make_live(&inner)); + } + assert_eq!(inner.num_idle(), MAX_CONNS as usize); + + // Sticky high-water mark of `num_idle` observed across all threads. A wrapped + // (underflowed) read is `usize::MAX`, far above `MAX_CONNS`. + let max_seen = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..WORKERS { + let inner = Arc::clone(&inner); + let max_seen = Arc::clone(&max_seen); + handles.push(thread::spawn(move || { + for _ in 0..ITERS { + // Real acquire/release cycle, racing the other threads. + if let Some(idle) = inner.try_acquire() { + inner.release(idle.into_live()); + } + max_seen.fetch_max(inner.num_idle(), Ordering::Relaxed); + } + })); + } + for h in handles { + h.join().expect("worker thread panicked"); + } + + let observed = max_seen.load(Ordering::Relaxed); + assert!( + observed <= MAX_CONNS as usize, + "num_idle exceeded max_connections (saw {observed}, max {MAX_CONNS}); \ + this indicates a `num_idle` underflow" + ); + + // Drain so the stub connections' size slots are released before the pool drops. + while inner.try_acquire().is_some() {} + } +}