diff --git a/composer.json b/composer.json index 51871a4..6f428f1 100755 --- a/composer.json +++ b/composer.json @@ -30,12 +30,14 @@ "require-dev": { "phpunit/phpunit": "11.*", "laravel/pint": "1.*", - "phpstan/phpstan": "1.*" + "phpstan/phpstan": "1.*", + "swoole/ide-helper": "5.1.2" }, "suggests": { "ext-mongodb": "Needed to support MongoDB database pools", "ext-redis": "Needed to support Redis cache pools", - "ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools" + "ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools", + "ext-swoole" : "Needed to support Swoole based pool adapter" }, "config": { "platform": { diff --git a/composer.lock b/composer.lock index 4574390..f6803a8 100644 --- a/composer.lock +++ b/composer.lock @@ -3762,6 +3762,38 @@ ], "time": "2024-10-20T05:08:20+00:00" }, + { + "name": "swoole/ide-helper", + "version": "5.1.2", + "source": { + "type": "git", + "url": "https://github.com/swoole/ide-helper.git", + "reference": "33ec7af9111b76d06a70dd31191cc74793551112" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/swoole/ide-helper/zipball/33ec7af9111b76d06a70dd31191cc74793551112", + "reference": "33ec7af9111b76d06a70dd31191cc74793551112", + "shasum": "" + }, + "type": "library", + "notification-url": "https://packagist.org/downloads/", + "license": [ + "Apache-2.0" + ], + "authors": [ + { + "name": "Team Swoole", + "email": "team@swoole.com" + } + ], + "description": "IDE help files for Swoole.", + "support": { + "issues": "https://github.com/swoole/ide-helper/issues", + "source": "https://github.com/swoole/ide-helper/tree/5.1.2" + }, + "time": "2024-02-01T22:28:11+00:00" + }, { "name": "theseer/tokenizer", "version": "1.3.1", diff --git a/src/Pools/Adapter.php b/src/Pools/Adapter.php new file mode 100644 index 0000000..afd8f20 --- /dev/null +++ b/src/Pools/Adapter.php @@ -0,0 +1,27 @@ + $pool */ + protected array $pool = []; + + /** + * Initialize the stack-based pool. + * + * Note: + * - `$size` is accepted for API compatibility with other pool adapters. + * - The stack adapter does NOT enforce capacity limits. + * - `$size` is ignored because the pool is backed by a simple array. + * + * @param int $size Ignored by the stack adapter. + */ + public function initialize(int $size): static + { + $this->pool = []; + return $this; + } + + public function push(mixed $connection): static + { + // Push connection to pool + $this->pool[] = $connection; + return $this; + } + + /** + * Pop an item from the stack. + * + * Note: The stack adapter does not support blocking operations. + * The `$timeout` parameter is ignored. + * + * @param int $timeout Ignored by the stack adapter. + * @return mixed|null Returns the popped item, or null if the stack is empty. + */ + public function pop(int $timeout): mixed + { + return array_pop($this->pool); + } + + public function count(): int + { + return count($this->pool); + } + + /** + * Executes the callback without acquiring a lock. + * + * This implementation does not provide mutual exclusion. + * The `$timeout` parameter is ignored. + * + * @param callable $callback Callback to execute. + * @param int $timeout Ignored. + * @return mixed The value returned by the callback. + */ + public function synchronized(callable $callback, int $timeout): mixed + { + return $callback(); + } +} diff --git a/src/Pools/Adapter/Swoole.php b/src/Pools/Adapter/Swoole.php new file mode 100644 index 0000000..2fc1d37 --- /dev/null +++ b/src/Pools/Adapter/Swoole.php @@ -0,0 +1,75 @@ +pool = new Channel($size); + + $this->lock = new Lock(SWOOLE_MUTEX); + + return $this; + } + + public function push(mixed $connection): static + { + // Push connection to channel + $this->pool->push($connection); + return $this; + } + + /** + * Pop an item from the pool. + * + * @param int $timeout Timeout in seconds. Use 0 for non-blocking pop. + * @return mixed|false Returns the pooled value, or false if the pool is empty + * or the timeout expires. + */ + public function pop(int $timeout): mixed + { + return $this->pool->pop($timeout); + } + + public function count(): int + { + $length = $this->pool->length(); + return is_int($length) ? $length : 0; + } + + /** + * Executes a callback while holding a lock. + * + * The lock is acquired before invoking the callback and is always released + * afterward, even if the callback throws an exception. + * + * @param callable $callback Callback to execute within the critical section. + * @param int $timeout Maximum time (in seconds) to wait for the lock. + * @return mixed The value returned by the callback. + * + * @throws \RuntimeException If the lock cannot be acquired within the timeout. + */ + public function synchronized(callable $callback, int $timeout): mixed + { + $acquired = $this->lock->lockwait($timeout); + + if (!$acquired) { + throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds"); + } + + try { + return $callback(); + } finally { + $this->lock->unlock(); + } + } +} diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index 43832b4..683360c 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -3,6 +3,8 @@ namespace Utopia\Pools; use Exception; +use Throwable; +use Utopia\Pools\Adapter as PoolAdapter; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; use Utopia\Telemetry\Gauge; @@ -39,15 +41,22 @@ class Pool protected int $retrySleep = 1; // seconds /** - * @var array|true> + * @var int */ - protected array $pool = []; + protected int $synchronizedTimeout = 3; // seconds + + protected PoolAdapter $pool; /** * @var array> */ protected array $active = []; + /** + * Total number of connections created + */ + protected int $connectionsCreated = 0; + private Gauge $telemetryOpenConnections; private Gauge $telemetryActiveConnections; private Gauge $telemetryIdleConnections; @@ -58,14 +67,17 @@ class Pool private array $telemetryAttributes; /** + * @param PoolAdapter $adapter * @param string $name * @param int $size * @param callable(): TResource $init */ - public function __construct(protected string $name, protected int $size, callable $init) + public function __construct(PoolAdapter $adapter, protected string $name, protected int $size, callable $init) { $this->init = $init; - $this->pool = array_fill(0, $this->size, true); + $this->pool = $adapter; + // Initialize empty channel (no pre-filling for lazy initialization) + $this->pool->initialize($this->size); $this->setTelemetry(new NoTelemetry()); } @@ -157,6 +169,27 @@ public function setRetrySleep(int $retrySleep): static return $this; } + /** + * Set the lock timeout for adapters that support synchronized locking. + * + * Note: + * - This setting is applied only if the underlying adapter supports lock timeouts. + * - For adapters that do not support locking or lock timeouts, this method is a no-op. + * + * @param int $timeout Synchronized lock timeout in seconds. + * @return $this + */ + public function setSynchronizationTimeout(int $timeout): static + { + $this->synchronizedTimeout = $timeout; + return $this; + } + + public function getSynchronizationTimeout(): int + { + return $this->synchronizedTimeout; + } + /** * @param Telemetry $telemetry * @return $this @@ -187,21 +220,40 @@ public function setTelemetry(Telemetry $telemetry): static * * @template T * @param callable(TResource): T $callback Function that receives the connection resource + * @param int $retries Number of retry attempts if the callback fails (default: 0) * @return T Return value from the callback + * @throws \Throwable If all retry attempts fail, throws the last exception */ - public function use(callable $callback): mixed + public function use(callable $callback, int $retries = 0): mixed { - $start = microtime(true); - $connection = null; - try { - $connection = $this->pop(); - return $callback($connection->getResource()); - } finally { - if ($connection !== null) { - $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); + /** + * @var Throwable $lastException + */ + $lastException = null; + + for ($attempt = 0; $attempt <= $retries; $attempt++) { + $start = microtime(true); + $connection = null; + + try { + $connection = $this->pop(); + $result = $callback($connection->getResource()); $this->reclaim($connection); + $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); + return $result; + } catch (\Throwable $e) { + if ($connection !== null) { + $this->destroy($connection); + } + $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); + $lastException = $e; + if ($attempt === $retries) { + throw $lastException; + } } } + + throw $lastException; } /** @@ -223,9 +275,37 @@ public function pop(): Connection try { do { $attempts++; - $connection = array_pop($this->pool); + // the connection creation block outside the lock so that other coroutines not get blocked in case of retries of a coroutine + // Lock: check + increment only + // Unlock + // Create connection (no lock) + // On failure: lock + decrement + $shouldCreateConnections = $this->pool->synchronized(function (): bool { + if ($this->pool->count() === 0 && $this->connectionsCreated < $this->size) { + $this->connectionsCreated++; + return true; + } + return false; + }, timeout: $this->getSynchronizationTimeout()); + + if ($shouldCreateConnections) { + try { + $connection = $this->createConnection(); + $this->pool->synchronized(function () use ($connection) { + $this->active[$connection->getID()] = $connection; + }, timeout: $this->getSynchronizationTimeout()); + return $connection; + } catch (\Exception $e) { + $this->pool->synchronized(function () { + $this->connectionsCreated--; + }, timeout: $this->getSynchronizationTimeout()); + throw $e; + } + } - if (is_null($connection)) { + $connection = $this->pool->pop($this->getSynchronizationTimeout()); + + if ($connection === false || $connection === null) { if ($attempts >= $this->getRetryAttempts()) { throw new Exception("Pool '{$this->name}' is empty (size {$this->size})"); } @@ -233,44 +313,56 @@ public function pop(): Connection $totalSleepTime += $this->getRetrySleep(); sleep($this->getRetrySleep()); } else { - break; + if ($connection instanceof Connection) { + $this->pool->synchronized(function () use ($connection) { + $this->active[$connection->getID()] = $connection; + }, timeout: $this->getSynchronizationTimeout()); + return $connection; + } } } while ($attempts < $this->getRetryAttempts()); - if ($connection === true) { // Pool has space, create connection - $attempts = 0; + throw new Exception('Failed to get a connection from the pool'); + } finally { + $this->recordPoolTelemetry(); + $this->telemetryWaitDuration->record($totalSleepTime, $this->telemetryAttributes); + } + } - do { - try { - $attempts++; - $connection = new Connection(($this->init)()); - break; // leave loop if successful - } catch (\Exception $e) { - if ($attempts >= $this->getReconnectAttempts()) { - throw new \Exception('Failed to create connection: ' . $e->getMessage()); - } - $totalSleepTime += $this->getReconnectSleep(); - sleep($this->getReconnectSleep()); - } - } while ($attempts < $this->getReconnectAttempts()); + /** + * Create a new connection + * + * @return Connection + * @throws \Exception + */ + protected function createConnection(): Connection + { + $connection = null; + $attempts = 0; + do { + try { + $attempts++; + $connection = new Connection(($this->init)()); + break; + } catch (\Exception $e) { + if ($attempts >= $this->getReconnectAttempts()) { + throw new \Exception('Failed to create connection: ' . $e->getMessage()); + } + sleep($this->getReconnectSleep()); } + } while ($attempts < $this->getReconnectAttempts()); - if ($connection instanceof Connection) { // connection is available, return it - if (empty($connection->getID())) { - $connection->setID($this->getName() . '-' . uniqid()); - } + if ($connection === null) { + throw new \Exception('Failed to create connection'); + } - $connection->setPool($this); + if (empty($connection->getID())) { + $connection->setID($this->getName() . '-' . uniqid()); + } - $this->active[$connection->getID()] = $connection; - return $connection; - } + $connection->setPool($this); - throw new Exception('Failed to get a connection from the pool'); - } finally { - $this->recordPoolTelemetry(); - $this->telemetryWaitDuration->record($totalSleepTime, $this->telemetryAttributes); - } + return $connection; } /** @@ -280,7 +372,8 @@ public function pop(): Connection public function push(Connection $connection): static { try { - $this->pool[] = $connection; + // Push the actual connection back to the pool + $this->pool->push($connection); unset($this->active[$connection->getID()]); return $this; @@ -290,11 +383,14 @@ public function push(Connection $connection): static } /** + * Returns the number of available connections (idle + not yet created) + * * @return int */ public function count(): int { - return count($this->pool); + // Available = idle connections in pool + connections not yet created + return $this->pool->count() + ($this->size - $this->connectionsCreated); } /** @@ -319,21 +415,47 @@ public function reclaim(?Connection $connection = null): static * @param Connection|null $connection * @return $this */ - public function destroy(?Connection $connection = null): static + private function destroyConnection(?Connection $connection = null): static { - try { - if ($connection !== null) { - $this->pool[] = true; - unset($this->active[$connection->getID()]); - return $this; - } - - foreach ($this->active as $connection) { - $this->pool[] = true; + if ($connection !== null) { + $shouldCreate = $this->pool->synchronized(function () use ($connection) { + $this->connectionsCreated--; unset($this->active[$connection->getID()]); + if ($this->connectionsCreated < $this->size) { + $this->connectionsCreated++; + return true; + }; + return false; + }, timeout: $this->getSynchronizationTimeout()); + + if ($shouldCreate) { + try { + $this->pool->push($this->createConnection()); + } catch (Exception $e) { + $this->pool->synchronized(function () { + $this->connectionsCreated--; + }, timeout: $this->getSynchronizationTimeout()); + throw $e; + } } return $this; + } + $activeConnections = array_values($this->active); + foreach ($activeConnections as $conn) { + $this->destroyConnection($conn); + } + return $this; + } + + /** + * @param Connection|null $connection + * @return $this + */ + public function destroy(?Connection $connection = null): static + { + try { + return $this->destroyConnection($connection); } finally { $this->recordPoolTelemetry(); } @@ -344,7 +466,7 @@ public function destroy(?Connection $connection = null): static */ public function isEmpty(): bool { - return empty($this->pool); + return $this->count() === 0; } /** @@ -352,18 +474,19 @@ public function isEmpty(): bool */ public function isFull(): bool { - return count($this->pool) === $this->size; + // Pool is full when all possible connections are available (idle or not created yet) + return count($this->active) === 0; } private function recordPoolTelemetry(): void { - // Connections get removed from $this->pool when they are active $activeConnections = count($this->active); - $existingConnections = count($this->pool); - $idleConnections = count(array_filter($this->pool, fn ($data) => $data instanceof Connection)); + $idleConnections = $this->pool->count(); // Connections in the pool (idle) + $openConnections = $activeConnections + $idleConnections; // Total connections in use or available + $this->telemetryActiveConnections->record($activeConnections, $this->telemetryAttributes); $this->telemetryIdleConnections->record($idleConnections, $this->telemetryAttributes); - $this->telemetryOpenConnections->record($activeConnections + $idleConnections, $this->telemetryAttributes); - $this->telemetryPoolCapacity->record($activeConnections + $existingConnections, $this->telemetryAttributes); + $this->telemetryOpenConnections->record($openConnections, $this->telemetryAttributes); + $this->telemetryPoolCapacity->record($this->connectionsCreated, $this->telemetryAttributes); } } diff --git a/tests/Pools/Adapter/StackTest.php b/tests/Pools/Adapter/StackTest.php new file mode 100644 index 0000000..bdd0619 --- /dev/null +++ b/tests/Pools/Adapter/StackTest.php @@ -0,0 +1,19 @@ +markTestSkipped('Swoole extension is not loaded'); + } + + $errors = []; + $successCount = 0; + + /** @phpstan-ignore-next-line */ + \Swoole\Coroutine\run(function () use (&$errors, &$successCount) { + // Create a pool with 5 connections inside coroutine context + $connectionCounter = 0; + $pool = new Pool(new Swoole(), 'swoole-test', 5, function () use (&$connectionCounter) { + $connectionCounter++; + return "connection-{$connectionCounter}"; + }); + + // Set retry attempts to allow waiting for connections to be released + $pool->setRetryAttempts(3); + $pool->setRetrySleep(0); + + // Spawn 10 coroutines trying to get connections from a pool of 5 + // First 5 should get connections immediately + // Next 5 should wait and reuse connections after they're returned + $channels = []; + for ($i = 0; $i < 10; $i++) { + $channels[$i] = new \Swoole\Coroutine\Channel(1); + } + + for ($i = 0; $i < 10; $i++) { + \Swoole\Coroutine::create(function () use ($pool, $i, &$errors, &$successCount, $channels) { + try { + // Each coroutine tries to get a connection + $connection = $pool->pop(); + + // Verify we got a valid connection + if (!$connection instanceof Connection) { + $errors[] = "Coroutine {$i}: Did not receive a valid Connection object"; + $channels[$i]->push(false); + return; + } + + if (empty($connection->getID())) { + $errors[] = "Coroutine {$i}: Connection has no ID"; + $channels[$i]->push(false); + return; + } + + // Simulate some work + \Swoole\Coroutine::sleep(0.01); + + // Return connection to pool + $pool->reclaim($connection); + + $successCount++; + $channels[$i]->push(true); + } catch (\Exception $e) { + $errors[] = "Coroutine {$i}: " . $e->getMessage(); + $channels[$i]->push(false); + } + }); + } + + // Wait for all coroutines to complete + foreach ($channels as $channel) { + $channel->pop(); + } + + // Assertions inside coroutine context + $this->assertEmpty($errors, 'Errors occurred: ' . implode(', ', $errors)); + $this->assertEquals(10, $successCount, 'All 10 coroutines should successfully complete'); + + // Pool should be full again after all connections are reclaimed + $this->assertEquals(5, $pool->count(), 'Pool should have all 5 connections back'); + + // Should only create exactly pool size connections (no race conditions with new implementation) + $this->assertEquals(5, $connectionCounter, 'Should create exactly 5 connections (pool size)'); + }); + } + + public function testSwooleCoroutineHighConcurrency(): void + { + if (!\extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not loaded'); + } + + $totalRequests = 20; + $successCount = 0; + $errorCount = 0; + + /** @phpstan-ignore-next-line */ + \Swoole\Coroutine\run(function () use ($totalRequests, &$successCount, &$errorCount) { + // Create a pool with 3 connections inside coroutine context + $connectionCounter = 0; + $pool = new Pool(new Swoole(), 'swoole-concurrent', 3, function () use (&$connectionCounter) { + $connectionCounter++; + return "connection-{$connectionCounter}"; + }); + + $pool->setRetryAttempts(3); + $pool->setRetrySleep(0); + + $channels = []; + for ($i = 0; $i < $totalRequests; $i++) { + $channels[$i] = new \Swoole\Coroutine\Channel(1); + } + + for ($i = 0; $i < $totalRequests; $i++) { + \Swoole\Coroutine::create(function () use ($pool, $i, &$successCount, &$errorCount, $channels) { + try { + $pool->use(function ($resource) use ($i) { + // Simulate work + \Swoole\Coroutine::sleep(0.01); + return "processed-{$i}"; + }); + $successCount++; + $channels[$i]->push(true); + } catch (\Exception $e) { + $errorCount++; + $channels[$i]->push(false); + } + }); + } + + // Wait for all coroutines to complete + foreach ($channels as $channel) { + $channel->pop(); + } + + // All requests should succeed with proper retry logic + $this->assertEquals($totalRequests, $successCount, "All {$totalRequests} requests should succeed"); + $this->assertEquals(0, $errorCount, 'No errors should occur with proper concurrency handling'); + + // Pool should be full again + $this->assertEquals(3, $pool->count(), 'Pool should have all 3 connections back'); + + // Should only create 3 connections (pool size) + $this->assertEquals(3, $connectionCounter, 'Should only create 3 connections (pool size)'); + }); + } + + public function testSwooleCoroutineConnectionUniqueness(): void + { + if (!\extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not loaded'); + } + + $seenResources = []; + $duplicateResources = []; + + /** @phpstan-ignore-next-line */ + \Swoole\Coroutine\run(function () use (&$seenResources, &$duplicateResources) { + // Create a pool with 5 connections inside coroutine context + $connectionCounter = 0; + $pool = new Pool(new Swoole(), 'swoole-uniqueness', 5, function () use (&$connectionCounter) { + $connectionCounter++; + return "connection-{$connectionCounter}"; + }); + + $pool->setRetryAttempts(1); + $pool->setRetrySleep(0); + + $channels = []; + for ($i = 0; $i < 5; $i++) { + $channels[$i] = new \Swoole\Coroutine\Channel(1); + } + + // Get all 5 connections simultaneously + for ($i = 0; $i < 5; $i++) { + \Swoole\Coroutine::create(function () use ($pool, $i, &$seenResources, &$duplicateResources, $channels) { + try { + $connection = $pool->pop(); + $resource = $connection->getResource(); + + // Check if we've seen this resource before (indicates race condition) + if (isset($seenResources[$resource])) { + $duplicateResources[] = $resource; + } else { + $seenResources[$resource] = $connection; + } + + // Hold the connection briefly + \Swoole\Coroutine::sleep(0.01); + + $channels[$i]->push(true); + } catch (\Exception $e) { + $channels[$i]->push(false); + } + }); + } + + // Wait for all coroutines to complete + foreach ($channels as $channel) { + $channel->pop(); + } + + // Assertions inside coroutine context + $this->assertEmpty($duplicateResources, 'Duplicate resources detected: ' . implode(', ', $duplicateResources)); + $this->assertCount(5, $seenResources, 'Should have exactly 5 unique connections'); + + // Verify each connection has a unique resource + $resources = array_keys($seenResources); + $this->assertCount(5, array_unique($resources), 'All connection resources should be unique'); + }); + } + + public function testSwooleCoroutineIdleConnectionReuse(): void + { + if (!\extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not loaded'); + } + + $connectionIds = []; + $connectionCounter = 0; + + /** @phpstan-ignore-next-line */ + \Swoole\Coroutine\run(function () use (&$connectionIds, &$connectionCounter) { + // Create a pool with 3 connections inside coroutine context + $pool = new Pool(new Swoole(), 'swoole-reuse', 3, function () use (&$connectionCounter) { + $connectionCounter++; + return "connection-{$connectionCounter}"; + }); + + $pool->setRetryAttempts(1); + $pool->setRetrySleep(0); + + // First wave: Create 3 connections + $firstWave = []; + for ($i = 0; $i < 3; $i++) { + $conn = $pool->pop(); + $firstWave[] = $conn; + $connectionIds['first'][] = $conn->getID(); + } + + // Return all connections + foreach ($firstWave as $conn) { + $pool->reclaim($conn); + } + + // Second wave: Should reuse the same 3 connections + $secondWave = []; + for ($i = 0; $i < 3; $i++) { + $conn = $pool->pop(); + $secondWave[] = $conn; + $connectionIds['second'][] = $conn->getID(); + } + + // Return all connections + foreach ($secondWave as $conn) { + $pool->reclaim($conn); + } + + // Assertions inside coroutine context + $this->assertEquals(3, $connectionCounter, 'Should only create 3 connections total'); + $this->assertCount(3, $connectionIds['first'], 'First wave should have 3 connections'); + $this->assertCount(3, $connectionIds['second'], 'Second wave should have 3 connections'); + + // Second wave should reuse connections from first wave + sort($connectionIds['first']); + sort($connectionIds['second']); + $this->assertEquals($connectionIds['first'], $connectionIds['second'], 'Second wave should reuse same connection IDs'); + }); + } + + public function testSwooleCoroutineStressTest(): void + { + if (!\extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not loaded'); + } + + $totalRequests = 100; + $successCount = 0; + $errorCount = 0; + $connectionCounter = 0; + + /** @phpstan-ignore-next-line */ + \Swoole\Coroutine\run(function () use ($totalRequests, &$successCount, &$errorCount, &$connectionCounter) { + // Create a pool with 10 connections inside coroutine context + $pool = new Pool(new Swoole(), 'swoole-stress', 10, function () use (&$connectionCounter) { + $connectionCounter++; + return "connection-{$connectionCounter}"; + }); + + $pool->setRetryAttempts(10); + $pool->setRetrySleep(0); + + $channels = []; + for ($i = 0; $i < $totalRequests; $i++) { + $channels[$i] = new \Swoole\Coroutine\Channel(1); + } + + for ($i = 0; $i < $totalRequests; $i++) { + \Swoole\Coroutine::create(function () use ($pool, $i, &$successCount, &$errorCount, $channels) { + try { + $pool->use(function ($resource) { + // Simulate variable work duration + \Swoole\Coroutine::sleep(0.001 * rand(1, 5)); + return $resource; + }); + $successCount++; + $channels[$i]->push(true); + } catch (\Exception $e) { + $errorCount++; + $channels[$i]->push(false); + } + }); + } + + // Wait for all coroutines to complete + foreach ($channels as $channel) { + $channel->pop(); + } + + // Assertions inside coroutine context + $this->assertEquals($totalRequests, $successCount, "All {$totalRequests} requests should succeed"); + $this->assertEquals(0, $errorCount, 'No errors should occur'); + $this->assertEquals(10, $connectionCounter, 'Should create exactly 10 connections (pool size)'); + $this->assertEquals(10, $pool->count(), 'Pool should have all connections back'); + }); + } +} diff --git a/tests/Pools/Base.php b/tests/Pools/Base.php new file mode 100644 index 0000000..41fa354 --- /dev/null +++ b/tests/Pools/Base.php @@ -0,0 +1,24 @@ + - */ - protected Connection $object; - - #[\Override] - public function setUp(): void - { - $this->object = new Connection('x'); - } - - public function testGetID(): void - { - $this->assertEquals(null, $this->object->getID()); - - $this->object->setID('test'); - - $this->assertEquals('test', $this->object->getID()); - } - - public function testSetID(): void - { - $this->assertEquals(null, $this->object->getID()); - - $this->assertInstanceOf(Connection::class, $this->object->setID('test')); - - $this->assertEquals('test', $this->object->getID()); - } - - public function testGetResource(): void - { - $this->assertEquals('x', $this->object->getResource()); - } - - public function testSetResource(): void - { - $this->assertEquals('x', $this->object->getResource()); - - $this->assertInstanceOf(Connection::class, $this->object->setResource('y')); - - $this->assertEquals('y', $this->object->getResource()); - } - - public function testSetPool(): void - { - $pool = new Pool('test', 1, fn () => 'x'); - - $this->assertNull($this->object->getPool()); - $this->assertInstanceOf(Connection::class, $this->object->setPool($pool)); - } - - public function testGetPool(): void - { - $pool = new Pool('test', 1, fn () => 'x'); - - $this->assertNull($this->object->getPool()); - $this->assertInstanceOf(Connection::class, $this->object->setPool($pool)); - - $pool = $this->object->getPool(); - - if ($pool === null) { - throw new Exception("Pool should never be null here."); - } - - $this->assertInstanceOf(Pool::class, $pool); - $this->assertEquals('test', $pool->getName()); - } - - public function testReclaim(): void - { - $pool = new Pool('test', 2, fn () => 'x'); - - $this->assertEquals(2, $pool->count()); - - $connection1 = $pool->pop(); - - $this->assertEquals(1, $pool->count()); - - $connection2 = $pool->pop(); - - $this->assertEquals(0, $pool->count()); - - $this->assertInstanceOf(Pool::class, $connection1->reclaim()); - - $this->assertEquals(1, $pool->count()); - - $this->assertInstanceOf(Pool::class, $connection2->reclaim()); - - $this->assertEquals(2, $pool->count()); - } - - public function testReclaimException(): void - { - $this->expectException(Exception::class); - $this->object->reclaim(); - } - - public function testDestroy(): void - { - $i = 0; - $object = new Pool('testDestroy', 2, function () use (&$i) { - $i++; - return $i <= 2 ? 'x' : 'y'; - }); - - $this->assertEquals(2, $object->count()); - - $connection1 = $object->pop(); - $connection2 = $object->pop(); - - $this->assertEquals(0, $object->count()); - - $this->assertEquals('x', $connection1->getResource()); - $this->assertEquals('x', $connection2->getResource()); - - $connection1->destroy(); - $connection2->destroy(); - - $this->assertEquals(2, $object->count()); - - $connection1 = $object->pop(); - $connection2 = $object->pop(); - - $this->assertEquals(0, $object->count()); - - $this->assertEquals('y', $connection1->getResource()); - $this->assertEquals('y', $connection2->getResource()); - } -} diff --git a/tests/Pools/GroupTest.php b/tests/Pools/GroupTest.php deleted file mode 100644 index de1c7ec..0000000 --- a/tests/Pools/GroupTest.php +++ /dev/null @@ -1,118 +0,0 @@ -object = new Group(); - } - - public function testAdd(): void - { - $this->object->add(new Pool('test', 1, fn () => 'x')); - - $this->assertInstanceOf(Pool::class, $this->object->get('test')); - } - - public function testGet(): void - { - $this->object->add(new Pool('test', 1, fn () => 'x')); - - $this->assertInstanceOf(Pool::class, $this->object->get('test')); - - $this->expectException(Exception::class); - - $this->assertInstanceOf(Pool::class, $this->object->get('testx')); - } - - public function testRemove(): void - { - $this->object->add(new Pool('test', 1, fn () => 'x')); - - $this->assertInstanceOf(Pool::class, $this->object->get('test')); - - $this->object->remove('test'); - - $this->expectException(Exception::class); - - $this->assertInstanceOf(Pool::class, $this->object->get('test')); - } - - public function testReset(): void - { - $this->object->add(new Pool('test', 5, fn () => 'x')); - - $this->assertEquals(5, $this->object->get('test')->count()); - - $this->object->get('test')->pop(); - $this->object->get('test')->pop(); - $this->object->get('test')->pop(); - - $this->assertEquals(2, $this->object->get('test')->count()); - - $this->object->reclaim(); - - $this->assertEquals(5, $this->object->get('test')->count()); - } - - public function testReconnectAttempts(): void - { - $this->object->add(new Pool('test', 5, fn () => 'x')); - - $this->assertEquals(3, $this->object->get('test')->getReconnectAttempts()); - - $this->object->setReconnectAttempts(5); - - $this->assertEquals(5, $this->object->get('test')->getReconnectAttempts()); - } - - public function testReconnectSleep(): void - { - $this->object->add(new Pool('test', 5, fn () => 'x')); - - $this->assertEquals(1, $this->object->get('test')->getReconnectSleep()); - - $this->object->setReconnectSleep(2); - - $this->assertEquals(2, $this->object->get('test')->getReconnectSleep()); - } - - public function testUse(): void - { - $pool1 = new Pool('pool1', 1, fn () => '1'); - $pool2 = new Pool('pool2', 1, fn () => '2'); - $pool3 = new Pool('pool3', 1, fn () => '3'); - - $this->object->add($pool1); - $this->object->add($pool2); - $this->object->add($pool3); - - $this->assertEquals(1, $pool1->count()); - $this->assertEquals(1, $pool2->count()); - $this->assertEquals(1, $pool3->count()); - - // @phpstan-ignore argument.type - $this->object->use(['pool1', 'pool3'], function ($one, $three) use ($pool1, $pool2, $pool3): void { - $this->assertEquals('1', $one); - $this->assertEquals('3', $three); - - $this->assertEquals(0, $pool1->count()); - $this->assertEquals(1, $pool2->count()); - $this->assertEquals(0, $pool3->count()); - }); - - $this->assertEquals(1, $pool1->count()); - $this->assertEquals(1, $pool2->count()); - $this->assertEquals(1, $pool3->count()); - } -} diff --git a/tests/Pools/PoolTest.php b/tests/Pools/PoolTest.php deleted file mode 100644 index 890340e..0000000 --- a/tests/Pools/PoolTest.php +++ /dev/null @@ -1,310 +0,0 @@ - - */ - protected Pool $object; - - #[\Override] - public function setUp(): void - { - $this->object = new Pool('test', 5, fn () => 'x'); - } - - public function testGetName(): void - { - $this->assertEquals('test', $this->object->getName()); - } - - public function testGetSize(): void - { - $this->assertEquals(5, $this->object->getSize()); - } - - public function testGetReconnectAttempts(): void - { - $this->assertEquals(3, $this->object->getReconnectAttempts()); - } - - public function testSetReconnectAttempts(): void - { - $this->assertEquals(3, $this->object->getReconnectAttempts()); - - $this->object->setReconnectAttempts(20); - - $this->assertEquals(20, $this->object->getReconnectAttempts()); - } - - public function testGetReconnectSleep(): void - { - $this->assertEquals(1, $this->object->getReconnectSleep()); - } - - public function testSetReconnectSleep(): void - { - $this->assertEquals(1, $this->object->getReconnectSleep()); - - $this->object->setReconnectSleep(20); - - $this->assertEquals(20, $this->object->getReconnectSleep()); - } - - public function testGetRetryAttempts(): void - { - $this->assertEquals(3, $this->object->getRetryAttempts()); - } - - public function testSetRetryAttempts(): void - { - $this->assertEquals(3, $this->object->getRetryAttempts()); - - $this->object->setRetryAttempts(20); - - $this->assertEquals(20, $this->object->getRetryAttempts()); - } - - public function testGetRetrySleep(): void - { - $this->assertEquals(1, $this->object->getRetrySleep()); - } - - public function testSetRetrySleep(): void - { - $this->assertEquals(1, $this->object->getRetrySleep()); - - $this->object->setRetrySleep(20); - - $this->assertEquals(20, $this->object->getRetrySleep()); - } - - public function testPop(): void - { - $this->assertEquals(5, $this->object->count()); - - $connection = $this->object->pop(); - - $this->assertEquals(4, $this->object->count()); - - $this->assertInstanceOf(Connection::class, $connection); - $this->assertEquals('x', $connection->getResource()); - - // Pool should be empty - $this->expectException(Exception::class); - - $this->assertInstanceOf(Connection::class, $this->object->pop()); - $this->assertInstanceOf(Connection::class, $this->object->pop()); - $this->assertInstanceOf(Connection::class, $this->object->pop()); - $this->assertInstanceOf(Connection::class, $this->object->pop()); - $this->assertInstanceOf(Connection::class, $this->object->pop()); - } - - public function testUse(): void - { - $this->assertEquals(5, $this->object->count()); - $this->object->use(function ($resource): void { - $this->assertEquals(4, $this->object->count()); - $this->assertEquals('x', $resource); - }); - - $this->assertEquals(5, $this->object->count()); - } - - public function testPush(): void - { - $this->assertEquals(5, $this->object->count()); - - $connection = $this->object->pop(); - - $this->assertEquals(4, $this->object->count()); - - $this->assertInstanceOf(Connection::class, $connection); - $this->assertEquals('x', $connection->getResource()); - - $this->assertInstanceOf(Pool::class, $this->object->push($connection)); - - $this->assertEquals(5, $this->object->count()); - } - - public function testCount(): void - { - $this->assertEquals(5, $this->object->count()); - - $connection = $this->object->pop(); - - $this->assertEquals(4, $this->object->count()); - - $this->object->push($connection); - - $this->assertEquals(5, $this->object->count()); - } - - public function testReclaim(): void - { - $this->assertEquals(5, $this->object->count()); - - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - - $this->assertEquals(2, $this->object->count()); - - $this->object->reclaim(); - - $this->assertEquals(5, $this->object->count()); - } - - public function testIsEmpty(): void - { - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - - $this->assertEquals(true, $this->object->isEmpty()); - } - - public function testIsFull(): void - { - $this->assertEquals(true, $this->object->isFull()); - - $connection = $this->object->pop(); - - $this->assertEquals(false, $this->object->isFull()); - - $this->object->push($connection); - - $this->assertEquals(true, $this->object->isFull()); - - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - - $this->assertEquals(false, $this->object->isFull()); - - $this->object->reclaim(); - - $this->assertEquals(true, $this->object->isFull()); - - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - - $this->assertEquals(false, $this->object->isFull()); - } - - public function testRetry(): void - { - $this->object->setReconnectAttempts(2); - $this->object->setReconnectSleep(2); - - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - $this->object->pop(); - - // Pool should be empty - $this->expectException(Exception::class); - - $timeStart = \time(); - $this->object->pop(); - $timeEnd = \time(); - - $timeDiff = $timeEnd - $timeStart; - - $this->assertGreaterThanOrEqual(4, $timeDiff); - } - - public function testDestroy(): void - { - $i = 0; - $object = new Pool('testDestroy', 2, function () use (&$i) { - $i++; - return $i <= 2 ? 'x' : 'y'; - }); - - $this->assertEquals(2, $object->count()); - - $connection1 = $object->pop(); - $connection2 = $object->pop(); - - $this->assertEquals(0, $object->count()); - - $this->assertEquals('x', $connection1->getResource()); - $this->assertEquals('x', $connection2->getResource()); - - $object->destroy(); - - $this->assertEquals(2, $object->count()); - - $connection1 = $object->pop(); - $connection2 = $object->pop(); - - $this->assertEquals(0, $object->count()); - - $this->assertEquals('y', $connection1->getResource()); - $this->assertEquals('y', $connection2->getResource()); - } - - public function testTelemetry(): void - { - $telemetry = new TestTelemetry(); - $this->object->setTelemetry($telemetry); - - $allocate = function (int $amount, callable $assertion): void { - $connections = []; - for ($i = 0; $i < $amount; $i++) { - $connections[] = $this->object->pop(); - } - - $assertion(); - - foreach ($connections as $connection) { - $this->object->reclaim($connection); - } - }; - - $this->assertEquals(5, $this->object->count()); - - $allocate(3, function () use ($telemetry): void { - /** @var object{values: array} $openGauge */ - $openGauge = $telemetry->gauges['pool.connection.open.count']; - /** @var object{values: array} $activeGauge */ - $activeGauge = $telemetry->gauges['pool.connection.active.count']; - /** @var object{values: array} $idleGauge */ - $idleGauge = $telemetry->gauges['pool.connection.idle.count']; - $this->assertEquals([1, 2, 3], $openGauge->values); - $this->assertEquals([1, 2, 3], $activeGauge->values); - $this->assertEquals([0, 0, 0], $idleGauge->values); - }); - - $this->assertEquals(5, $this->object->count()); - - $allocate(1, function () use ($telemetry): void { - /** @var object{values: array} $openGauge */ - $openGauge = $telemetry->gauges['pool.connection.open.count']; - /** @var object{values: array} $activeGauge */ - $activeGauge = $telemetry->gauges['pool.connection.active.count']; - /** @var object{values: array} $idleGauge */ - $idleGauge = $telemetry->gauges['pool.connection.idle.count']; - $this->assertEquals([1, 2, 3, 3, 3, 3, 3], $openGauge->values); - $this->assertEquals([1, 2, 3, 2, 1, 0, 1], $activeGauge->values); - $this->assertEquals([0, 0, 0, 1, 2, 3, 2], $idleGauge->values); - }); - } -} diff --git a/tests/Pools/Scopes/ConnectionTestScope.php b/tests/Pools/Scopes/ConnectionTestScope.php new file mode 100644 index 0000000..7cb4a5d --- /dev/null +++ b/tests/Pools/Scopes/ConnectionTestScope.php @@ -0,0 +1,166 @@ + + */ + protected Connection $connectionObject; + + protected function setUpConnection(): void + { + $this->connectionObject = new Connection('x'); + } + + public function testConnectionGetID(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $this->assertEquals(null, $this->connectionObject->getID()); + + $this->connectionObject->setID('test'); + + $this->assertEquals('test', $this->connectionObject->getID()); + }); + } + + public function testConnectionSetID(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $this->assertEquals(null, $this->connectionObject->getID()); + + $this->assertInstanceOf(Connection::class, $this->connectionObject->setID('test')); + + $this->assertEquals('test', $this->connectionObject->getID()); + }); + } + + public function testConnectionGetResource(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $this->assertEquals('x', $this->connectionObject->getResource()); + }); + } + + public function testConnectionSetResource(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $this->assertEquals('x', $this->connectionObject->getResource()); + + $this->assertInstanceOf(Connection::class, $this->connectionObject->setResource('y')); + + $this->assertEquals('y', $this->connectionObject->getResource()); + }); + } + + public function testConnectionSetPool(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x'); + + $this->assertNull($this->connectionObject->getPool()); + $this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool)); + }); + } + + public function testConnectionGetPool(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x'); + + $this->assertNull($this->connectionObject->getPool()); + $this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool)); + + $pool = $this->connectionObject->getPool(); + + if ($pool === null) { + throw new Exception("Pool should never be null here."); + } + + $this->assertInstanceOf(Pool::class, $pool); + $this->assertEquals('test', $pool->getName()); + }); + } + + public function testConnectionReclaim(): void + { + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'test', 2, fn () => 'x'); + + $this->assertEquals(2, $pool->count()); + + $connection1 = $pool->pop(); + + $this->assertEquals(1, $pool->count()); + + $connection2 = $pool->pop(); + + $this->assertEquals(0, $pool->count()); + + $this->assertInstanceOf(Pool::class, $connection1->reclaim()); + + $this->assertEquals(1, $pool->count()); + + $this->assertInstanceOf(Pool::class, $connection2->reclaim()); + + $this->assertEquals(2, $pool->count()); + }); + } + + public function testConnectionReclaimException(): void + { + $this->execute(function (): void { + $this->setUpConnection(); + $this->expectException(Exception::class); + $this->connectionObject->reclaim(); + }); + } + + public function testConnectionDestroy(): void + { + $this->execute(function (): void { + $i = 0; + $object = new Pool($this->getAdapter(), 'testDestroy', 2, function () use (&$i) { + $i++; + return $i <= 2 ? 'x' : 'y'; + }); + + $this->assertEquals(2, $object->count()); + + $connection1 = $object->pop(); + $connection2 = $object->pop(); + + $this->assertEquals(0, $object->count()); + + $this->assertEquals('x', $connection1->getResource()); + $this->assertEquals('x', $connection2->getResource()); + + $connection1->destroy(); + $connection2->destroy(); + + $this->assertEquals(2, $object->count()); + + $connection1 = $object->pop(); + $connection2 = $object->pop(); + + $this->assertEquals(0, $object->count()); + + $this->assertEquals('y', $connection1->getResource()); + $this->assertEquals('y', $connection2->getResource()); + }); + } +} diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php new file mode 100644 index 0000000..bd37d65 --- /dev/null +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -0,0 +1,140 @@ +groupObject = new Group(); + } + + public function testGroupAdd(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); + + $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); + }); + } + + public function testGroupGet(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); + + $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); + + $this->expectException(Exception::class); + + $this->assertInstanceOf(Pool::class, $this->groupObject->get('testx')); + }); + } + + public function testGroupRemove(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); + + $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); + + $this->groupObject->remove('test'); + + $this->expectException(Exception::class); + + $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); + }); + } + + public function testGroupReset(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); + + $this->assertEquals(5, $this->groupObject->get('test')->count()); + + $this->groupObject->get('test')->pop(); + $this->groupObject->get('test')->pop(); + $this->groupObject->get('test')->pop(); + + $this->assertEquals(2, $this->groupObject->get('test')->count()); + + $this->groupObject->reclaim(); + + $this->assertEquals(5, $this->groupObject->get('test')->count()); + }); + } + + public function testGroupReconnectAttempts(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); + + $this->assertEquals(3, $this->groupObject->get('test')->getReconnectAttempts()); + + $this->groupObject->setReconnectAttempts(5); + + $this->assertEquals(5, $this->groupObject->get('test')->getReconnectAttempts()); + }); + } + + public function testGroupReconnectSleep(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); + + $this->assertEquals(1, $this->groupObject->get('test')->getReconnectSleep()); + + $this->groupObject->setReconnectSleep(2); + + $this->assertEquals(2, $this->groupObject->get('test')->getReconnectSleep()); + }); + } + + public function testGroupUse(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $pool1 = new Pool($this->getAdapter(), 'pool1', 1, fn () => '1'); + $pool2 = new Pool($this->getAdapter(), 'pool2', 1, fn () => '2'); + $pool3 = new Pool($this->getAdapter(), 'pool3', 1, fn () => '3'); + + $this->groupObject->add($pool1); + $this->groupObject->add($pool2); + $this->groupObject->add($pool3); + + $this->assertEquals(1, $pool1->count()); + $this->assertEquals(1, $pool2->count()); + $this->assertEquals(1, $pool3->count()); + + // @phpstan-ignore argument.type + $this->groupObject->use(['pool1', 'pool3'], function ($one, $three) use ($pool1, $pool2, $pool3): void { + $this->assertEquals('1', $one); + $this->assertEquals('3', $three); + + $this->assertEquals(0, $pool1->count()); + $this->assertEquals(1, $pool2->count()); + $this->assertEquals(0, $pool3->count()); + }); + + $this->assertEquals(1, $pool1->count()); + $this->assertEquals(1, $pool2->count()); + $this->assertEquals(1, $pool3->count()); + }); + } +} diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php new file mode 100644 index 0000000..c7d6b55 --- /dev/null +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -0,0 +1,491 @@ + + */ + protected Pool $poolObject; + + protected function setUpPool(): void + { + $this->poolObject = new Pool($this->getAdapter(), 'test', 5, fn () => 'x'); + } + + public function testPoolGetName(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals('test', $this->poolObject->getName()); + }); + } + + public function testPoolGetSize(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(5, $this->poolObject->getSize()); + }); + } + + public function testPoolGetReconnectAttempts(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(3, $this->poolObject->getReconnectAttempts()); + }); + } + + public function testPoolSetReconnectAttempts(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(3, $this->poolObject->getReconnectAttempts()); + + $this->poolObject->setReconnectAttempts(20); + + $this->assertEquals(20, $this->poolObject->getReconnectAttempts()); + }); + } + + public function testPoolGetReconnectSleep(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(1, $this->poolObject->getReconnectSleep()); + }); + } + + public function testPoolSetReconnectSleep(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(1, $this->poolObject->getReconnectSleep()); + + $this->poolObject->setReconnectSleep(20); + + $this->assertEquals(20, $this->poolObject->getReconnectSleep()); + }); + } + + public function testPoolGetRetryAttempts(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(3, $this->poolObject->getRetryAttempts()); + }); + } + + public function testPoolSetRetryAttempts(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(3, $this->poolObject->getRetryAttempts()); + + $this->poolObject->setRetryAttempts(20); + + $this->assertEquals(20, $this->poolObject->getRetryAttempts()); + }); + } + + public function testPoolGetRetrySleep(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(1, $this->poolObject->getRetrySleep()); + }); + } + + public function testPoolSetRetrySleep(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(1, $this->poolObject->getRetrySleep()); + + $this->poolObject->setRetrySleep(20); + + $this->assertEquals(20, $this->poolObject->getRetrySleep()); + }); + } + + public function testPoolPop(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(5, $this->poolObject->count()); + + $connection = $this->poolObject->pop(); + + $this->assertEquals(4, $this->poolObject->count()); + + $this->assertInstanceOf(Connection::class, $connection); + $this->assertEquals('x', $connection->getResource()); + + // Pop remaining 4 connections + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + // Pool should be empty, next pop should throw + $this->expectException(Exception::class); + $this->poolObject->pop(); + }); + } + + public function testPoolUse(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(5, $this->poolObject->count()); + $this->poolObject->use(function ($resource): void { + $this->assertEquals(4, $this->poolObject->count()); + $this->assertEquals('x', $resource); + }); + + $this->assertEquals(5, $this->poolObject->count()); + }); + } + + public function testPoolPush(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(5, $this->poolObject->count()); + + $connection = $this->poolObject->pop(); + + $this->assertEquals(4, $this->poolObject->count()); + + $this->assertInstanceOf(Connection::class, $connection); + $this->assertEquals('x', $connection->getResource()); + + $this->assertInstanceOf(Pool::class, $this->poolObject->push($connection)); + + $this->assertEquals(5, $this->poolObject->count()); + }); + } + + public function testPoolCount(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(5, $this->poolObject->count()); + + $connection = $this->poolObject->pop(); + + $this->assertEquals(4, $this->poolObject->count()); + + $this->poolObject->push($connection); + + $this->assertEquals(5, $this->poolObject->count()); + }); + } + + public function testPoolReclaim(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(5, $this->poolObject->count()); + + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + + $this->assertEquals(2, $this->poolObject->count()); + + $this->poolObject->reclaim(); + + $this->assertEquals(5, $this->poolObject->count()); + }); + } + + public function testPoolIsEmpty(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + + $this->assertEquals(true, $this->poolObject->isEmpty()); + }); + } + + public function testPoolIsFull(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->assertEquals(true, $this->poolObject->isFull()); + + $connection = $this->poolObject->pop(); + + $this->assertEquals(false, $this->poolObject->isFull()); + + $this->poolObject->push($connection); + + $this->assertEquals(true, $this->poolObject->isFull()); + + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + + $this->assertEquals(false, $this->poolObject->isFull()); + + $this->poolObject->reclaim(); + + $this->assertEquals(true, $this->poolObject->isFull()); + + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + + $this->assertEquals(false, $this->poolObject->isFull()); + }); + } + + public function testPoolRetry(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $this->poolObject->setReconnectAttempts(2); + $this->poolObject->setReconnectSleep(2); + + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + $this->poolObject->pop(); + + // Pool should be empty + $this->expectException(Exception::class); + + $timeStart = \time(); + $this->poolObject->pop(); + $timeEnd = \time(); + + $timeDiff = $timeEnd - $timeStart; + + $this->assertGreaterThanOrEqual(4, $timeDiff); + }); + } + + public function testPoolDestroy(): void + { + $this->execute(function (): void { + $i = 0; + $object = new Pool($this->getAdapter(), 'testDestroy', 2, function () use (&$i) { + $i++; + return $i <= 2 ? 'x' : 'y'; + }); + + $this->assertEquals(2, $object->count()); + + $connection1 = $object->pop(); + $connection2 = $object->pop(); + + $this->assertEquals(0, $object->count()); + + $this->assertEquals('x', $connection1->getResource()); + $this->assertEquals('x', $connection2->getResource()); + + $object->destroy(); + + $this->assertEquals(2, $object->count()); + + $connection1 = $object->pop(); + $connection2 = $object->pop(); + + $this->assertEquals(0, $object->count()); + + $this->assertEquals('y', $connection1->getResource()); + $this->assertEquals('y', $connection2->getResource()); + }); + } + + public function testPoolTelemetry(): void + { + $this->execute(function (): void { + $this->setUpPool(); + $telemetry = new TestTelemetry(); + $this->poolObject->setTelemetry($telemetry); + + $allocate = function (int $amount, callable $assertion): void { + $connections = []; + for ($i = 0; $i < $amount; $i++) { + $connections[] = $this->poolObject->pop(); + } + + $assertion(); + + foreach ($connections as $connection) { + $this->poolObject->reclaim($connection); + } + }; + + $this->assertEquals(5, $this->poolObject->count()); + + $allocate(3, function () use ($telemetry): void { + /** @var object{values: array} $openGauge */ + $openGauge = $telemetry->gauges['pool.connection.open.count']; + /** @var object{values: array} $activeGauge */ + $activeGauge = $telemetry->gauges['pool.connection.active.count']; + /** @var object{values: array} $idleGauge */ + $idleGauge = $telemetry->gauges['pool.connection.idle.count']; + $this->assertEquals([1, 2, 3], $openGauge->values); + $this->assertEquals([1, 2, 3], $activeGauge->values); + $this->assertEquals([0, 0, 0], $idleGauge->values); + }); + + $this->assertEquals(5, $this->poolObject->count()); + + $allocate(1, function () use ($telemetry): void { + /** @var object{values: array} $openGauge */ + $openGauge = $telemetry->gauges['pool.connection.open.count']; + /** @var object{values: array} $activeGauge */ + $activeGauge = $telemetry->gauges['pool.connection.active.count']; + /** @var object{values: array} $idleGauge */ + $idleGauge = $telemetry->gauges['pool.connection.idle.count']; + $this->assertEquals([1, 2, 3, 3, 3, 3, 3], $openGauge->values); + $this->assertEquals([1, 2, 3, 2, 1, 0, 1], $activeGauge->values); + $this->assertEquals([0, 0, 0, 1, 2, 3, 2], $idleGauge->values); + }); + }); + } + + public function testPoolUseWithRetrySuccess(): void + { + $this->execute(function (): void { + $i = 0; + $pool = new Pool($this->getAdapter(), 'testRetry', 2, function () use (&$i) { + $i++; + return "connection-{$i}"; + }); + + $attempts = 0; + $result = $pool->use(function ($resource) use (&$attempts) { + $attempts++; + + // Fail on first two attempts, succeed on third + if ($attempts < 3) { + throw new Exception("Simulated connection failure"); + } + + return "success: {$resource}"; + }, 3); // Allow up to 3 retries (4 total attempts) + + $this->assertEquals(3, $attempts); + $this->assertEquals("success: connection-3", $result); + + // Pool should have connections available (destroyed failed ones, created new) + $this->assertGreaterThan(0, $pool->count()); + }); + + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'testIntermittent', 5, fn () => 'resource'); + + $callCount = 0; + + $result = $pool->use(function ($resource) use (&$callCount) { + $callCount++; + + // Fail on odd attempts, succeed on even + if ($callCount % 2 === 1) { + throw new Exception("Odd attempt failure"); + } + + return "success on attempt {$callCount}"; + }, 5); // Allow 5 retries + + $this->assertEquals("success on attempt 2", $result); + $this->assertEquals(2, $callCount); // Should succeed on second attempt + }); + } + + public function testPoolUseWithRetryFailure(): void + { + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'testRetryFail', 3, fn () => 'x'); + + $attempts = 0; + + try { + $pool->use(function ($resource) use (&$attempts) { + $attempts++; + throw new Exception("Persistent failure"); + }, 2); // Allow up to 2 retries (3 total attempts) + } catch (Exception $e) { + $this->assertEquals("Persistent failure", $e->getMessage()); + $this->assertEquals(3, $attempts); // Should have tried 3 times (initial + 2 retries) + } + }); + } + + public function testPoolUseWithoutRetry(): void + { + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'testNoRetry', 2, fn () => 'x'); + + $attempts = 0; + + try { + $pool->use(function ($resource) use (&$attempts) { + $attempts++; + throw new Exception("First attempt failure"); + }); // No retries (default) + } catch (Exception $e) { + $this->assertEquals("First attempt failure", $e->getMessage()); + $this->assertEquals(1, $attempts); // Should only try once + } + }); + } + + public function testPoolUseRetryDestroysFailedConnections(): void + { + $this->execute(function (): void { + $i = 0; + $pool = new Pool($this->getAdapter(), 'testDestroyOnRetry', 3, function () use (&$i) { + $i++; + return "connection-{$i}"; + }); + + $attempts = 0; + $seenResources = []; + + $pool->use(function ($resource) use (&$attempts, &$seenResources) { + $attempts++; + $seenResources[] = $resource; + + // Fail twice, succeed on third + if ($attempts < 3) { + throw new Exception("Connection failed"); + } + + return "success"; + }, 3); + + // Should have created 3 connections (one for each attempt) + $this->assertEquals(3, $i); + $this->assertEquals(3, $attempts); + + // Each attempt should have gotten a different connection (failed ones were destroyed) + $this->assertCount(3, array_unique($seenResources)); + $this->assertEquals(['connection-1', 'connection-2', 'connection-3'], $seenResources); + }); + } +}