-
Notifications
You must be signed in to change notification settings - Fork 3
Refactoring pools for concurrent operations #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
356fead
5467696
d3519cb
51e7c0f
d8ae4c9
5347ce0
b861f5b
a703aaf
2f12968
22a8b38
1fc36c9
fdd750f
a70164f
05d27a9
c32a92a
c77ddd8
c9d98e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools; | ||
|
|
||
| abstract class Adapter | ||
| { | ||
| abstract public function fill(int $size, mixed $value): static; | ||
|
||
|
|
||
| abstract public function push(mixed $connection): static; | ||
|
|
||
| /** | ||
| * @param int $timeout | ||
| * @return mixed | ||
| */ | ||
| abstract public function pop(int $timeout): mixed; | ||
|
|
||
| abstract public function count(): int; | ||
|
|
||
| /** | ||
| * Execute a callback with lock protection | ||
| * | ||
| * @param callable $callback | ||
| * @param float $timeout Timeout in seconds | ||
| * @return mixed | ||
| */ | ||
| abstract public function withLock(callable $callback, int $timeout): mixed; | ||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools\Adapter; | ||
|
|
||
| use Utopia\Pools\Adapter; | ||
|
|
||
| class Stack extends Adapter | ||
| { | ||
| /** @var array<mixed> $pool */ | ||
| protected array $pool = []; | ||
|
|
||
| public function fill(int $size, mixed $value): static | ||
| { | ||
| // Initialize empty pool (no pre-filling) | ||
| $this->pool = []; | ||
| return $this; | ||
| } | ||
|
|
||
| public function push(mixed $connection): static | ||
| { | ||
| // Push connection to pool | ||
| $this->pool[] = $connection; | ||
| return $this; | ||
| } | ||
|
|
||
| public function pop(int $timeout): mixed | ||
| { | ||
| return array_pop($this->pool); | ||
| } | ||
abnegate marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public function count(): int | ||
| { | ||
| return count($this->pool); | ||
| } | ||
|
|
||
| public function withLock(callable $callback, int $timeout): mixed | ||
|
||
| { | ||
| return $callback(); | ||
| } | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools\Adapter; | ||
|
|
||
| use Utopia\Pools\Adapter; | ||
| use Swoole\Coroutine\Channel; | ||
| use Swoole\Lock; | ||
|
|
||
| class Swoole extends Adapter | ||
| { | ||
| protected Channel $pool; | ||
|
|
||
| /** @var Lock $lock */ | ||
| protected Lock $lock; | ||
ArnabChatterjee20k marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public function fill(int $size, mixed $value): static | ||
| { | ||
| // Create empty channel with capacity (no pre-filling) | ||
| $this->pool = new Channel($size); | ||
|
|
||
| // Initialize lock for thread-safe operations | ||
| $this->lock = new Lock(SWOOLE_MUTEX); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this lock the whole worker? Or just the coroutine? Which is desired?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mutex is shared across all coroutines in the worker process. When one coroutine acquires it, the entire worker is blocked until the lock is released. This ensures atomic updates (e.g., incrementing/decrementing counters or registering/deregistering active connections) |
||
|
|
||
| return $this; | ||
| } | ||
|
|
||
| public function push(mixed $connection): static | ||
| { | ||
| // Push connection to channel | ||
| $this->pool->push($connection); | ||
| return $this; | ||
| } | ||
|
|
||
| public function pop(int $timeout): mixed | ||
| { | ||
| $result = $this->pool->pop($timeout); | ||
|
|
||
| // if pool is empty or timeout occured => result will be false | ||
|
||
| return $result; | ||
| } | ||
|
Comment on lines
38
to
41
|
||
|
|
||
|
|
||
| public function count(): int | ||
| { | ||
| return (int) $this->pool->length(); | ||
| } | ||
|
|
||
| public function withLock(callable $callback, int $timeout): mixed | ||
| { | ||
| // Acquire lock for thread-safe operations with timeout to prevent deadlock | ||
| $acquired = $this->lock->lockwait($timeout); | ||
|
|
||
| if (!$acquired) { | ||
| throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds"); | ||
| } | ||
|
|
||
| try { | ||
| return $callback(); | ||
| } finally { | ||
| $this->lock->unlock(); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.