Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
"require-dev": {
"phpunit/phpunit": "11.*",
"laravel/pint": "1.*",
"phpstan/phpstan": "1.*"
"phpstan/phpstan": "1.*",
"swoole/ide-helper": "5.1.2"
},
"suggests": {
"ext-mongodb": "Needed to support MongoDB database pools",
"ext-redis": "Needed to support Redis cache pools",
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools"
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools",
"ext-swoole" : "Needed to support Swoole based pool adapter"
},
"config": {
"platform": {
Expand Down
32 changes: 32 additions & 0 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/Pools/Adapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Utopia\Pools;

abstract class Adapter
{
abstract public function initialize(int $size): static;

abstract public function push(mixed $connection): static;

/**
* @param int $timeout
* @return mixed
*/
abstract public function pop(int $timeout): mixed;

abstract public function count(): int;

/**
* Execute a callback with lock protection if the adapter supports it
*
* @param callable $callback
* @param int $timeout Timeout in seconds
* @return mixed
*/
abstract public function synchronized(callable $callback, int $timeout): mixed;
}
68 changes: 68 additions & 0 deletions src/Pools/Adapter/Stack.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

namespace Utopia\Pools\Adapter;

use Utopia\Pools\Adapter;

class Stack extends Adapter
{
/** @var array<mixed> $pool */
protected array $pool = [];

/**
* Initialize the stack-based pool.
*
* Note:
* - `$size` is accepted for API compatibility with other pool adapters.
* - The stack adapter does NOT enforce capacity limits.
* - `$size` is ignored because the pool is backed by a simple array.
*
* @param int $size Ignored by the stack adapter.
*/
public function initialize(int $size): static
{
$this->pool = [];
return $this;
}

public function push(mixed $connection): static
{
// Push connection to pool
$this->pool[] = $connection;
return $this;
}

/**
* Pop an item from the stack.
*
* Note: The stack adapter does not support blocking operations.
* The `$timeout` parameter is ignored.
*
* @param int $timeout Ignored by the stack adapter.
* @return mixed|null Returns the popped item, or null if the stack is empty.
*/
public function pop(int $timeout): mixed
{
return array_pop($this->pool);
}

public function count(): int
{
return count($this->pool);
}

/**
* Executes the callback without acquiring a lock.
*
* This implementation does not provide mutual exclusion.
* The `$timeout` parameter is ignored.
*
* @param callable $callback Callback to execute.
* @param int $timeout Ignored.
* @return mixed The value returned by the callback.
*/
public function synchronized(callable $callback, int $timeout): mixed
{
return $callback();
}
}
75 changes: 75 additions & 0 deletions src/Pools/Adapter/Swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

namespace Utopia\Pools\Adapter;

use Utopia\Pools\Adapter;
use Swoole\Coroutine\Channel;
use Swoole\Lock;

class Swoole extends Adapter
{
protected Channel $pool;

/** @var Lock $lock */
protected Lock $lock;
public function initialize(int $size): static
{
$this->pool = new Channel($size);

$this->lock = new Lock(SWOOLE_MUTEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this lock the whole worker? Or just the coroutine? Which is desired?

Copy link
Author

@ArnabChatterjee20k ArnabChatterjee20k Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutex is shared across all coroutines in the worker process. When one coroutine acquires it, the entire worker is blocked until the lock is released. This ensures atomic updates (e.g., incrementing/decrementing counters or registering/deregistering active connections)
so all operations on the pool will be synchronized


return $this;
}

public function push(mixed $connection): static
{
// Push connection to channel
$this->pool->push($connection);
return $this;
}

/**
* Pop an item from the pool.
*
* @param int $timeout Timeout in seconds. Use 0 for non-blocking pop.
* @return mixed|false Returns the pooled value, or false if the pool is empty
* or the timeout expires.
*/
public function pop(int $timeout): mixed
{
return $this->pool->pop($timeout);
}

public function count(): int
{
$length = $this->pool->length();
return is_int($length) ? $length : 0;
}

/**
* Executes a callback while holding a lock.
*
* The lock is acquired before invoking the callback and is always released
* afterward, even if the callback throws an exception.
*
* @param callable $callback Callback to execute within the critical section.
* @param int $timeout Maximum time (in seconds) to wait for the lock.
* @return mixed The value returned by the callback.
*
* @throws \RuntimeException If the lock cannot be acquired within the timeout.
*/
public function synchronized(callable $callback, int $timeout): mixed
{
$acquired = $this->lock->lockwait($timeout);

if (!$acquired) {
throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds");
}

try {
return $callback();
} finally {
$this->lock->unlock();
}
}
}
Loading