From fd9637f2b0bac80b8a038c57a43f447f548119fa Mon Sep 17 00:00:00 2001 From: Hemachandar Date: Tue, 17 Mar 2026 13:49:02 +0530 Subject: [PATCH 1/3] Implement RPUSH for priority messages --- composer.json | 5 +- phpunit.xml | 3 + src/Queue/Broker/AMQP.php | 2 +- src/Queue/Broker/Pool.php | 2 +- src/Queue/Broker/Redis.php | 5 +- src/Queue/Publisher.php | 2 +- tests/Queue/E2E/Adapter/Base.php | 11 +++ tests/Queue/E2E/Adapter/RedisPriorityTest.php | 73 ++++++++++++++ tests/Queue/Unit/Broker/RedisBrokerTest.php | 97 +++++++++++++++++++ 9 files changed, 195 insertions(+), 5 deletions(-) create mode 100644 tests/Queue/E2E/Adapter/RedisPriorityTest.php create mode 100644 tests/Queue/Unit/Broker/RedisBrokerTest.php diff --git a/composer.json b/composer.json index 4017e49..af313cf 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,10 @@ "psr-4": {"Utopia\\Queue\\": "src/Queue"} }, "autoload-dev": { - "psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"} + "psr-4": { + "Tests\\E2E\\": "tests/Queue/E2E", + "Tests\\Unit\\": "tests/Queue/Unit" + } }, "scripts":{ "test": "phpunit", diff --git a/phpunit.xml b/phpunit.xml index 1b8f40d..dc5e0eb 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -9,6 +9,9 @@ stopOnFailure="false" > + + ./tests/Queue/Unit + ./tests/Queue/E2E/Adapter diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 62b2774..80e10da 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -135,7 +135,7 @@ public function close(): void $this->channel?->getConnection()?->close(); } - public function enqueue(Queue $queue, array $payload): bool + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ 'pid' => \uniqid(more_entropy: true), diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index aa7cf92..5fcdcc7 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -15,7 +15,7 @@ public function __construct( ) { } - public function enqueue(Queue $queue, array $payload): bool + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { return $this->delegatePublish(__FUNCTION__, \func_get_args()); } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index e036147..b36e1b3 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -104,7 +104,7 @@ public function close(): void $this->closed = true; } - public function enqueue(Queue $queue, array $payload): bool + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ 'pid' => \uniqid(more_entropy: true), @@ -112,6 +112,9 @@ public function enqueue(Queue $queue, array $payload): bool 'timestamp' => time(), 'payload' => $payload ]; + if ($priority) { + return $this->connection->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + } return $this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } diff --git a/src/Queue/Publisher.php b/src/Queue/Publisher.php index 1778656..9ccda90 100644 --- a/src/Queue/Publisher.php +++ b/src/Queue/Publisher.php @@ -11,7 +11,7 @@ interface Publisher * @param array $payload * @return bool */ - public function enqueue(Queue $queue, array $payload): bool; + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool; /** * Retries failed jobs. diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index e507d0d..37eb7be 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -86,6 +86,17 @@ public function testConcurrency(): void }); } + public function testEnqueuePriority(): void + { + $publisher = $this->getPublisher(); + + $result = $publisher->enqueue($this->getQueue(), ['type' => 'test_string', 'value' => 'priority'], priority: true); + + $this->assertTrue($result); + + sleep(1); + } + /** * @depends testEvents */ diff --git a/tests/Queue/E2E/Adapter/RedisPriorityTest.php b/tests/Queue/E2E/Adapter/RedisPriorityTest.php new file mode 100644 index 0000000..3164d5e --- /dev/null +++ b/tests/Queue/E2E/Adapter/RedisPriorityTest.php @@ -0,0 +1,73 @@ +connection = new Redis('redis', 6379); + $this->broker = new RedisBroker($this->connection); + $this->queue = new Queue('priority-e2e-test'); + + // Flush any leftover state from previous runs. + $key = "{$this->queue->namespace}.queue.{$this->queue->name}"; + while ($this->connection->rightPopArray($key, 0) !== false) { + // drain + } + } + + public function testPriorityJobIsConsumedBeforeNormalJobs(): void + { + // Enqueue three normal jobs (pushed to head/left). + $this->broker->enqueue($this->queue, ['order' => 'normal-1']); + $this->broker->enqueue($this->queue, ['order' => 'normal-2']); + $this->broker->enqueue($this->queue, ['order' => 'normal-3']); + + // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). + $this->broker->enqueue($this->queue, ['order' => 'priority'], priority: true); + + $key = "{$this->queue->namespace}.queue.{$this->queue->name}"; + + // The first pop should yield the priority job. + $first = $this->connection->rightPopArray($key, 1); + $this->assertNotFalse($first, 'Expected a job but queue was empty'); + $this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first'); + + // The remaining three should be normal jobs (consumed oldest-first). + $second = $this->connection->rightPopArray($key, 1); + $this->assertSame('normal-1', $second['payload']['order']); + + $third = $this->connection->rightPopArray($key, 1); + $this->assertSame('normal-2', $third['payload']['order']); + + $fourth = $this->connection->rightPopArray($key, 1); + $this->assertSame('normal-3', $fourth['payload']['order']); + + // Queue should now be empty. + $this->assertFalse($this->connection->rightPopArray($key, 0)); + } + + public function testEnqueuePriorityReturnsBool(): void + { + $result = $this->broker->enqueue($this->queue, ['check' => 'return-value'], priority: true); + $this->assertIsBool($result); + $this->assertTrue($result); + } +} diff --git a/tests/Queue/Unit/Broker/RedisBrokerTest.php b/tests/Queue/Unit/Broker/RedisBrokerTest.php new file mode 100644 index 0000000..9bdc4e1 --- /dev/null +++ b/tests/Queue/Unit/Broker/RedisBrokerTest.php @@ -0,0 +1,97 @@ +connection = $this->createMock(Connection::class); + $this->broker = new Redis($this->connection); + $this->queue = new Queue('test'); + } + + public function testEnqueueNormalUsesLeftPush(): void + { + $this->connection + ->expects($this->once()) + ->method('leftPushArray') + ->with( + $this->equalTo('utopia-queue.queue.test'), + $this->callback(fn($p) => $p['queue'] === 'test' && $p['payload'] === ['foo' => 'bar']) + ) + ->willReturn(true); + + $this->connection->expects($this->never())->method('rightPushArray'); + + $result = $this->broker->enqueue($this->queue, ['foo' => 'bar']); + + $this->assertTrue($result); + } + + public function testEnqueuePriorityFalseUsesLeftPush(): void + { + $this->connection + ->expects($this->once()) + ->method('leftPushArray') + ->willReturn(true); + + $this->connection->expects($this->never())->method('rightPushArray'); + + $result = $this->broker->enqueue($this->queue, ['foo' => 'bar'], priority: false); + + $this->assertTrue($result); + } + + public function testEnqueuePriorityUsesRightPush(): void + { + $this->connection + ->expects($this->once()) + ->method('rightPushArray') + ->with( + $this->equalTo('utopia-queue.queue.test'), + $this->callback(fn($p) => $p['queue'] === 'test' && $p['payload'] === ['urgent' => true]) + ) + ->willReturn(true); + + $this->connection->expects($this->never())->method('leftPushArray'); + + $result = $this->broker->enqueue($this->queue, ['urgent' => true], priority: true); + + $this->assertTrue($result); + } + + public function testEnqueuePriorityPayloadHasRequiredFields(): void + { + $capturedPayload = null; + + $this->connection + ->expects($this->once()) + ->method('rightPushArray') + ->with( + $this->anything(), + $this->callback(function ($p) use (&$capturedPayload) { + $capturedPayload = $p; + return true; + }) + ) + ->willReturn(true); + + $this->broker->enqueue($this->queue, ['data' => 1], priority: true); + + $this->assertArrayHasKey('pid', $capturedPayload); + $this->assertArrayHasKey('queue', $capturedPayload); + $this->assertArrayHasKey('timestamp', $capturedPayload); + $this->assertArrayHasKey('payload', $capturedPayload); + $this->assertNotEmpty($capturedPayload['pid']); + } +} From c6c326add9b5c0b98182585c46aee9f01f882347 Mon Sep 17 00:00:00 2001 From: Hemachandar Date: Tue, 17 Mar 2026 17:59:00 +0530 Subject: [PATCH 2/3] better tests --- composer.json | 3 +- phpunit.xml | 3 - tests/Queue/E2E/Adapter/RedisPriorityTest.php | 73 -------------- .../E2E/Adapter/SwooleRedisClusterTest.php | 49 +++++++++- tests/Queue/E2E/Adapter/SwooleTest.php | 45 ++++++++- tests/Queue/Unit/Broker/RedisBrokerTest.php | 97 ------------------- 6 files changed, 91 insertions(+), 179 deletions(-) delete mode 100644 tests/Queue/E2E/Adapter/RedisPriorityTest.php delete mode 100644 tests/Queue/Unit/Broker/RedisBrokerTest.php diff --git a/composer.json b/composer.json index af313cf..5fb217a 100644 --- a/composer.json +++ b/composer.json @@ -16,8 +16,7 @@ }, "autoload-dev": { "psr-4": { - "Tests\\E2E\\": "tests/Queue/E2E", - "Tests\\Unit\\": "tests/Queue/Unit" + "Tests\\E2E\\": "tests/Queue/E2E" } }, "scripts":{ diff --git a/phpunit.xml b/phpunit.xml index dc5e0eb..1b8f40d 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -9,9 +9,6 @@ stopOnFailure="false" > - - ./tests/Queue/Unit - ./tests/Queue/E2E/Adapter diff --git a/tests/Queue/E2E/Adapter/RedisPriorityTest.php b/tests/Queue/E2E/Adapter/RedisPriorityTest.php deleted file mode 100644 index 3164d5e..0000000 --- a/tests/Queue/E2E/Adapter/RedisPriorityTest.php +++ /dev/null @@ -1,73 +0,0 @@ -connection = new Redis('redis', 6379); - $this->broker = new RedisBroker($this->connection); - $this->queue = new Queue('priority-e2e-test'); - - // Flush any leftover state from previous runs. - $key = "{$this->queue->namespace}.queue.{$this->queue->name}"; - while ($this->connection->rightPopArray($key, 0) !== false) { - // drain - } - } - - public function testPriorityJobIsConsumedBeforeNormalJobs(): void - { - // Enqueue three normal jobs (pushed to head/left). - $this->broker->enqueue($this->queue, ['order' => 'normal-1']); - $this->broker->enqueue($this->queue, ['order' => 'normal-2']); - $this->broker->enqueue($this->queue, ['order' => 'normal-3']); - - // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). - $this->broker->enqueue($this->queue, ['order' => 'priority'], priority: true); - - $key = "{$this->queue->namespace}.queue.{$this->queue->name}"; - - // The first pop should yield the priority job. - $first = $this->connection->rightPopArray($key, 1); - $this->assertNotFalse($first, 'Expected a job but queue was empty'); - $this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first'); - - // The remaining three should be normal jobs (consumed oldest-first). - $second = $this->connection->rightPopArray($key, 1); - $this->assertSame('normal-1', $second['payload']['order']); - - $third = $this->connection->rightPopArray($key, 1); - $this->assertSame('normal-2', $third['payload']['order']); - - $fourth = $this->connection->rightPopArray($key, 1); - $this->assertSame('normal-3', $fourth['payload']['order']); - - // Queue should now be empty. - $this->assertFalse($this->connection->rightPopArray($key, 0)); - } - - public function testEnqueuePriorityReturnsBool(): void - { - $result = $this->broker->enqueue($this->queue, ['check' => 'return-value'], priority: true); - $this->assertIsBool($result); - $this->assertTrue($result); - } -} diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index b1a744c..d7a6e2e 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -9,14 +9,59 @@ class SwooleRedisClusterTest extends Base { + private function getConnection(): RedisCluster + { + return new RedisCluster([ + 'redis-cluster-0:6379', + 'redis-cluster-1:6379', + 'redis-cluster-2:6379', + ]); + } + protected function getPublisher(): Publisher { - $connection = new RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); - return new Redis($connection); + return new Redis($this->getConnection()); } protected function getQueue(): Queue { return new Queue('swoole-redis-cluster'); } + + public function testPriorityJobIsConsumedBeforeNormalJobs(): void + { + $connection = $this->getConnection(); + $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Flush any leftover state from previous runs. + while ($connection->rightPopArray($key, 1) !== false) { + // drain + } + + // Enqueue three normal jobs (pushed to head/left). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + + // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + + // The first pop should yield the priority job. + $first = $connection->rightPopArray($key, 1); + $this->assertNotFalse($first, 'Expected a job but queue was empty'); + $this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first'); + + // The remaining three should be normal jobs (consumed oldest-first). + $second = $connection->rightPopArray($key, 1); + $this->assertSame('normal-1', $second['payload']['order']); + + $third = $connection->rightPopArray($key, 1); + $this->assertSame('normal-2', $third['payload']['order']); + + $fourth = $connection->rightPopArray($key, 1); + $this->assertSame('normal-3', $fourth['payload']['order']); + + // Queue should now be empty. + $this->assertFalse($connection->rightPopArray($key, 1)); + } } diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index 9a3f183..641b1c3 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -9,14 +9,55 @@ class SwooleTest extends Base { + private function getConnection(): Redis + { + return new Redis('redis', 6379); + } + protected function getPublisher(): Publisher { - $connection = new Redis('redis', 6379); - return new RedisBroker($connection); + return new RedisBroker($this->getConnection()); } protected function getQueue(): Queue { return new Queue('swoole'); } + + public function testPriorityJobIsConsumedBeforeNormalJobs(): void + { + $connection = $this->getConnection(); + $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Flush any leftover state from previous runs. + while ($connection->rightPopArray($key, 1) !== false) { + // drain + } + + // Enqueue three normal jobs (pushed to head/left). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + + // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + + // The first pop should yield the priority job. + $first = $connection->rightPopArray($key, 1); + $this->assertNotFalse($first, 'Expected a job but queue was empty'); + $this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first'); + + // The remaining three should be normal jobs (consumed oldest-first). + $second = $connection->rightPopArray($key, 1); + $this->assertSame('normal-1', $second['payload']['order']); + + $third = $connection->rightPopArray($key, 1); + $this->assertSame('normal-2', $third['payload']['order']); + + $fourth = $connection->rightPopArray($key, 1); + $this->assertSame('normal-3', $fourth['payload']['order']); + + // Queue should now be empty. + $this->assertFalse($connection->rightPopArray($key, 1)); + } } diff --git a/tests/Queue/Unit/Broker/RedisBrokerTest.php b/tests/Queue/Unit/Broker/RedisBrokerTest.php deleted file mode 100644 index 9bdc4e1..0000000 --- a/tests/Queue/Unit/Broker/RedisBrokerTest.php +++ /dev/null @@ -1,97 +0,0 @@ -connection = $this->createMock(Connection::class); - $this->broker = new Redis($this->connection); - $this->queue = new Queue('test'); - } - - public function testEnqueueNormalUsesLeftPush(): void - { - $this->connection - ->expects($this->once()) - ->method('leftPushArray') - ->with( - $this->equalTo('utopia-queue.queue.test'), - $this->callback(fn($p) => $p['queue'] === 'test' && $p['payload'] === ['foo' => 'bar']) - ) - ->willReturn(true); - - $this->connection->expects($this->never())->method('rightPushArray'); - - $result = $this->broker->enqueue($this->queue, ['foo' => 'bar']); - - $this->assertTrue($result); - } - - public function testEnqueuePriorityFalseUsesLeftPush(): void - { - $this->connection - ->expects($this->once()) - ->method('leftPushArray') - ->willReturn(true); - - $this->connection->expects($this->never())->method('rightPushArray'); - - $result = $this->broker->enqueue($this->queue, ['foo' => 'bar'], priority: false); - - $this->assertTrue($result); - } - - public function testEnqueuePriorityUsesRightPush(): void - { - $this->connection - ->expects($this->once()) - ->method('rightPushArray') - ->with( - $this->equalTo('utopia-queue.queue.test'), - $this->callback(fn($p) => $p['queue'] === 'test' && $p['payload'] === ['urgent' => true]) - ) - ->willReturn(true); - - $this->connection->expects($this->never())->method('leftPushArray'); - - $result = $this->broker->enqueue($this->queue, ['urgent' => true], priority: true); - - $this->assertTrue($result); - } - - public function testEnqueuePriorityPayloadHasRequiredFields(): void - { - $capturedPayload = null; - - $this->connection - ->expects($this->once()) - ->method('rightPushArray') - ->with( - $this->anything(), - $this->callback(function ($p) use (&$capturedPayload) { - $capturedPayload = $p; - return true; - }) - ) - ->willReturn(true); - - $this->broker->enqueue($this->queue, ['data' => 1], priority: true); - - $this->assertArrayHasKey('pid', $capturedPayload); - $this->assertArrayHasKey('queue', $capturedPayload); - $this->assertArrayHasKey('timestamp', $capturedPayload); - $this->assertArrayHasKey('payload', $capturedPayload); - $this->assertNotEmpty($capturedPayload['pid']); - } -} From 45f5a59ae71bdabf6e3e967a2f4108746dec9db3 Mon Sep 17 00:00:00 2001 From: Hemachandar Date: Tue, 17 Mar 2026 18:00:01 +0530 Subject: [PATCH 3/3] nit --- tests/Queue/E2E/Adapter/Base.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index 37eb7be..12ab6fd 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -93,8 +93,6 @@ public function testEnqueuePriority(): void $result = $publisher->enqueue($this->getQueue(), ['type' => 'test_string', 'value' => 'priority'], priority: true); $this->assertTrue($result); - - sleep(1); } /**