-
Notifications
You must be signed in to change notification settings - Fork 3
Refactoring pools for concurrent operations #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
356fead
5467696
d3519cb
51e7c0f
d8ae4c9
5347ce0
b861f5b
a703aaf
2f12968
22a8b38
1fc36c9
fdd750f
a70164f
05d27a9
c32a92a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools; | ||
|
|
||
| abstract class Adapter | ||
| { | ||
| abstract public function initialize(int $size): static; | ||
|
|
||
| abstract public function push(mixed $connection): static; | ||
|
|
||
| /** | ||
| * @param int $timeout | ||
| * @return mixed | ||
| */ | ||
| abstract public function pop(int $timeout): mixed; | ||
|
|
||
| abstract public function count(): int; | ||
|
|
||
| /** | ||
| * Execute a callback with lock protection if the adapter supports it | ||
| * | ||
| * @param callable $callback | ||
| * @param int $timeout Timeout in seconds | ||
| * @return mixed | ||
| */ | ||
| abstract public function synchronized(callable $callback, int $timeout): mixed; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools\Adapter; | ||
|
|
||
| use Utopia\Pools\Adapter; | ||
|
|
||
| class Stack extends Adapter | ||
| { | ||
| /** @var array<mixed> $pool */ | ||
| protected array $pool = []; | ||
|
|
||
| /** | ||
| * Initialize the stack-based pool. | ||
| * | ||
| * Note: | ||
| * - `$size` is accepted for API compatibility with other pool adapters. | ||
| * - The stack adapter does NOT enforce capacity limits. | ||
| * - `$size` is ignored because the pool is backed by a simple array. | ||
| * | ||
| * @param int $size Ignored by the stack adapter. | ||
| */ | ||
| public function initialize(int $size): static | ||
| { | ||
| $this->pool = []; | ||
| return $this; | ||
| } | ||
|
|
||
| public function push(mixed $connection): static | ||
| { | ||
| // Push connection to pool | ||
| $this->pool[] = $connection; | ||
| return $this; | ||
| } | ||
|
|
||
| /** | ||
| * Pop an item from the stack. | ||
| * | ||
| * Note: The stack adapter does not support blocking operations. | ||
| * The `$timeout` parameter is ignored. | ||
| * | ||
| * @param int $timeout Ignored by the stack adapter. | ||
| * @return mixed|null Returns the popped item, or null if the stack is empty. | ||
| */ | ||
| public function pop(int $timeout): mixed | ||
| { | ||
| return array_pop($this->pool); | ||
| } | ||
|
|
||
| public function count(): int | ||
| { | ||
| return count($this->pool); | ||
| } | ||
|
|
||
| /** | ||
| * Executes the callback without acquiring a lock. | ||
| * | ||
| * This implementation does not provide mutual exclusion. | ||
| * The `$timeout` parameter is ignored. | ||
| * | ||
| * @param callable $callback Callback to execute. | ||
| * @param int $timeout Ignored. | ||
| * @return mixed The value returned by the callback. | ||
| */ | ||
| public function synchronized(callable $callback, int $timeout): mixed | ||
| { | ||
| return $callback(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools\Adapter; | ||
|
|
||
| use Utopia\Pools\Adapter; | ||
| use Swoole\Coroutine\Channel; | ||
| use Swoole\Lock; | ||
|
|
||
| class Swoole extends Adapter | ||
| { | ||
| protected Channel $pool; | ||
|
|
||
| /** @var Lock $lock */ | ||
| protected Lock $lock; | ||
ArnabChatterjee20k marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public function initialize(int $size): static | ||
| { | ||
| $this->pool = new Channel($size); | ||
|
|
||
| $this->lock = new Lock(SWOOLE_MUTEX); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this lock the whole worker? Or just the coroutine? Which is desired?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mutex is shared across all coroutines in the worker process. When one coroutine acquires it, the entire worker is blocked until the lock is released. This ensures atomic updates (e.g., incrementing/decrementing counters or registering/deregistering active connections) |
||
|
|
||
| return $this; | ||
| } | ||
|
|
||
| public function push(mixed $connection): static | ||
| { | ||
| // Push connection to channel | ||
| $this->pool->push($connection); | ||
| return $this; | ||
| } | ||
|
|
||
| /** | ||
| * Pop an item from the pool. | ||
| * | ||
| * @param int $timeout Timeout in seconds. Use 0 for non-blocking pop. | ||
| * @return mixed|false Returns the pooled value, or false if the pool is empty | ||
| * or the timeout expires. | ||
| */ | ||
| public function pop(int $timeout): mixed | ||
| { | ||
| return $this->pool->pop($timeout); | ||
| } | ||
|
|
||
| public function count(): int | ||
| { | ||
| $length = $this->pool->length(); | ||
| return is_int($length) ? $length : 0; | ||
| } | ||
|
|
||
| /** | ||
| * Executes a callback while holding a lock. | ||
| * | ||
| * The lock is acquired before invoking the callback and is always released | ||
| * afterward, even if the callback throws an exception. | ||
| * | ||
| * @param callable $callback Callback to execute within the critical section. | ||
| * @param int $timeout Maximum time (in seconds) to wait for the lock. | ||
| * @return mixed The value returned by the callback. | ||
| * | ||
| * @throws \RuntimeException If the lock cannot be acquired within the timeout. | ||
| */ | ||
| public function synchronized(callable $callback, int $timeout): mixed | ||
| { | ||
| $acquired = $this->lock->lockwait($timeout); | ||
|
|
||
| if (!$acquired) { | ||
| throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds"); | ||
| } | ||
|
|
||
| try { | ||
| return $callback(); | ||
| } finally { | ||
| $this->lock->unlock(); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.