Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
composer.lock
.phpstorm.meta.php
phpunit.xml
.phpunit.result.cache
.phpunit.*
.php_cs.cache
32 changes: 16 additions & 16 deletions src/Queue/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ class RabbitMQQueue extends Queue implements QueueContract, RabbitMQQueueContrac
/**
* Holds the Configuration
*/
protected QueueConfig $config;
protected QueueConfig $rabbitMQConfig;

/**
* RabbitMQQueue constructor.
*/
public function __construct(QueueConfig $config)
{
$this->config = $config;
$this->rabbitMQConfig = $config;
$this->dispatchAfterCommit = $config->isDispatchAfterCommit();
}

Expand Down Expand Up @@ -293,7 +293,7 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue
*/
public function getJobClass(): string
{
$job = $this->getConfig()->getAbstractJob();
$job = $this->getRabbitMQConfig()->getAbstractJob();

throw_if(
! is_a($job, RabbitMQJob::class, true),
Expand All @@ -309,7 +309,7 @@ public function getJobClass(): string
*/
public function getQueue($queue = null): string
{
return $queue ?: $this->getConfig()->getQueue();
return $queue ?: $this->getRabbitMQConfig()->getQueue();
}

/**
Expand Down Expand Up @@ -523,7 +523,7 @@ protected function createMessage($payload, int $attempts = 0): array
$properties['correlation_id'] = $correlationId;
}

if ($this->getConfig()->isPrioritizeDelayed()) {
if ($this->getRabbitMQConfig()->isPrioritizeDelayed()) {
$properties['priority'] = $attempts;
}

Expand Down Expand Up @@ -605,16 +605,16 @@ protected function getQueueArguments(string $destination): array
// Messages with a priority which is higher than the queue's maximum, are treated as if they were
// published with the maximum priority.
// Quorum queues does not support priority.
if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) {
$arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority();
if ($this->getRabbitMQConfig()->isPrioritizeDelayed() && ! $this->getRabbitMQConfig()->isQuorum()) {
$arguments['x-max-priority'] = $this->getRabbitMQConfig()->getQueueMaxPriority();
}

if ($this->getConfig()->isRerouteFailed()) {
if ($this->getRabbitMQConfig()->isRerouteFailed()) {
$arguments['x-dead-letter-exchange'] = $this->getFailedExchange();
$arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination);
}

if ($this->getConfig()->isQuorum()) {
if ($this->getRabbitMQConfig()->isQuorum()) {
$arguments['x-queue-type'] = 'quorum';
}

Expand All @@ -639,7 +639,7 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array
*/
protected function getExchange(?string $exchange = null): string
{
return $exchange ?? $this->getConfig()->getExchange();
return $exchange ?? $this->getRabbitMQConfig()->getExchange();
}

/**
Expand All @@ -648,15 +648,15 @@ protected function getExchange(?string $exchange = null): string
*/
protected function getRoutingKey(string $destination): string
{
return ltrim(sprintf($this->getConfig()->getExchangeRoutingKey(), $destination), '.');
return ltrim(sprintf($this->getRabbitMQConfig()->getExchangeRoutingKey(), $destination), '.');
}

/**
* Get the exchangeType, or AMQPExchangeType::DIRECT as default.
*/
protected function getExchangeType(?string $type = null): string
{
$constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType());
$constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getRabbitMQConfig()->getExchangeType());

return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT;
}
Expand All @@ -666,7 +666,7 @@ protected function getExchangeType(?string $type = null): string
*/
protected function getFailedExchange(?string $exchange = null): string
{
return $exchange ?? $this->getConfig()->getFailedExchange();
return $exchange ?? $this->getRabbitMQConfig()->getFailedExchange();
}

/**
Expand All @@ -675,7 +675,7 @@ protected function getFailedExchange(?string $exchange = null): string
*/
protected function getFailedRoutingKey(string $destination): string
{
return ltrim(sprintf($this->getConfig()->getFailedRoutingKey(), $destination), '.');
return ltrim(sprintf($this->getRabbitMQConfig()->getFailedRoutingKey(), $destination), '.');
}

/**
Expand Down Expand Up @@ -735,9 +735,9 @@ protected function publishProperties($queue, array $options = []): array
return [$destination, $exchange, $exchangeType, $attempts];
}

protected function getConfig(): QueueConfig
protected function getRabbitMQConfig(): QueueConfig
{
return $this->config;
return $this->rabbitMQConfig;
}

/**
Expand Down
4 changes: 4 additions & 0 deletions tests/Feature/SslQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

class SslQueueTest extends TestCase
{
protected bool $interactsWithConnection = false;

protected function setUp(): void
{
parent::setUp();

$this->markTestSkipped();
}

Expand Down
21 changes: 15 additions & 6 deletions tests/Feature/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@

abstract class TestCase extends BaseTestCase
{
/**
* Set to false for skipped tests.
*/
protected bool $interactsWithConnection = true;

/**
* @throws AMQPProtocolChannelException
*/
protected function setUp(): void
{
parent::setUp();

if ($this->connection()->isQueueExists()) {
$this->connection()->purge();
if ($this->interactsWithConnection) {
if ($this->connection()->isQueueExists()) {
$this->connection()->purge();
}
}
}

Expand All @@ -31,11 +38,13 @@ protected function setUp(): void
*/
protected function tearDown(): void
{
if ($this->connection()->isQueueExists()) {
$this->connection()->purge();
}
if ($this->interactsWithConnection) {
if ($this->connection()->isQueueExists()) {
$this->connection()->purge();
}

self::assertSame(0, Queue::size());
self::assertSame(0, Queue::size());
}

parent::tearDown();
}
Expand Down
48 changes: 24 additions & 24 deletions tests/Functional/RabbitMQQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,50 @@ public function testConnection(): void
public function testConfigRerouteFailed(): void
{
$queue = $this->connection();
$this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());

$queue = $this->connection('rabbitmq-with-options');
$this->assertTrue($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());
}

public function testConfigPrioritizeDelayed(): void
{
$queue = $this->connection();
$this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());

$queue = $this->connection('rabbitmq-with-options');
$this->assertTrue($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());
}

public function testQueueMaxPriority(): void
{
$queue = $this->connection();
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());

$queue = $this->connection('rabbitmq-with-options');
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(20, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(20, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
}

public function testConfigExchangeType(): void
Expand All @@ -88,7 +88,7 @@ public function testConfigExchangeType(): void
$this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType'));

// testing an unkown type with a default
$this->callProperty($queue, 'config')->setExchangeType('unknown');
$this->callProperty($queue, 'rabbitMQConfig')->setExchangeType('unknown');
$this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType'));
}

Expand Down Expand Up @@ -161,7 +161,7 @@ public function testRoutingKey(): void

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test']));
$this->callProperty($queue, 'config')->setExchangeRoutingKey('.an.alternate.routing-key');
$this->callProperty($queue, 'rabbitMQConfig')->setExchangeRoutingKey('.an.alternate.routing-key');
$this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test']));
}

Expand All @@ -180,26 +180,26 @@ public function testFailedRoutingKey(): void

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test']));
$this->callProperty($queue, 'config')->setFailedRoutingKey('.an.alternate.routing-key');
$this->callProperty($queue, 'rabbitMQConfig')->setFailedRoutingKey('.an.alternate.routing-key');
$this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test']));
}

public function testConfigQuorum(): void
{
$queue = $this->connection();
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-options');
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-quorum-options');
$this->assertTrue($this->callProperty($queue, 'config')->isQuorum());
$this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());
}

public function testDeclareDeleteExchange(): void
Expand Down