Skip to content

Commit d469f2a

Browse files
author
Harry Bragg
authored
adds a reject method to do the opposite of acknowledge (#43)
* adds a reject method to do the opposite of acknowledge * lint fix * add tests for 7.2 / nightly * some boy scouting * add 7.2 to allowed failures list * change install to build * use dist for travis * fix singluar/plural mixing
1 parent 3084e3f commit d469f2a

15 files changed

+314
-68
lines changed

.travis.yml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,35 @@
11
language: php
22

3-
sudo: false
3+
dist: trusty
4+
5+
cache:
6+
directories:
7+
- $HOME/.composer/cache/files
48

59
php:
610
- 5.5
711
- 5.6
812
- 7.0
913
- 7.1
14+
- 7.2
15+
- nightly
1016
- hhvm
1117

1218
env:
1319
- 'COMPOSER_FLAGS="--prefer-lowest --prefer-stable"'
1420
- 'COMPOSER_FLAGS=""'
1521

22+
matrix:
23+
allow_failures:
24+
- php: nightly
25+
- php: 7.2
26+
1627
install:
17-
- travis_retry composer update ${COMPOSER_FLAGS} --no-interaction --prefer-source
28+
- travis_retry composer update ${COMPOSER_FLAGS} --no-interaction --prefer-dist
1829

1930
script:
2031
- vendor/bin/phpcs -p --warning-severity=0 src/ tests/
2132
- vendor/bin/phpunit --coverage-clover=./tests/report/coverage.clover
2233

2334
after_script:
24-
- ./build/coverage_to_scruitinizer.sh
35+
- test -f ./tests/report/coverage.clover && (wget https://scrutinizer-ci.com/ocular.phar; php ocular.phar code-coverage:upload --format=php-clover ./tests/report/coverage.clover)

Makefile

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
SHELL = /bin/sh
22

33
DOCKER ?= $(shell which docker)
4-
DOCKER_REPOSITORY := graze/php-alpine:test
4+
DOCKER_REPOSITORY := graze/php-alpine:7.1-test
55
VOLUME := /opt/graze/queue
66
VOLUME_MAP := -v $$(pwd):${VOLUME}
77
DOCKER_RUN_BASE := ${DOCKER} run --rm -t ${VOLUME_MAP} -w ${VOLUME}
@@ -14,15 +14,17 @@ DOCKER_RUN := ${DOCKER_RUN_BASE} ${DOCKER_REPOSITORY}
1414

1515
# Building
1616

17-
install: ## Download the dependencies then build the image :rocket:.
18-
make 'composer-install --optimize-autoloader --ignore-platform-reqs'
17+
build: ## Download the dependencies
18+
make 'composer-install --optimize-autoloader'
19+
20+
build-update: ## Update and download the dependencies
21+
make 'composer-update --optimize-autoloader'
1922

2023
composer-%: ## Run a composer command, `make "composer-<command> [...]"`.
2124
${DOCKER} run -t --rm \
22-
-v $$(pwd):/usr/src/app \
23-
-v ~/.composer:/root/composer \
24-
-v ~/.ssh:/root/.ssh:ro \
25-
graze/composer --no-interaction --prefer-dist $* $(filter-out $@,$(MAKECMDGOALS))
25+
-v $$(pwd):/app:delegated \
26+
-v ~/.composer:/tmp:delegated \
27+
composer --no-interaction --prefer-dist $* $(filter-out $@,$(MAKECMDGOALS))
2628

2729
# Testing
2830

@@ -42,10 +44,10 @@ test-integration: ## Run the integration testsuite.
4244
${DOCKER_RUN} vendor/bin/phpunit --colors=always --testsuite integration
4345

4446
test-matrix: ## Run the unit tests against multiple targets.
45-
make DOCKER_REPOSITORY="php:5.5-alpine" test
4647
make DOCKER_REPOSITORY="php:5.6-alpine" test
4748
make DOCKER_REPOSITORY="php:7.0-alpine" test
4849
make DOCKER_REPOSITORY="php:7.1-alpine" test
50+
make DOCKER_REPOSITORY="php:7.2-alpine" test
4951
make DOCKER_REPOSITORY="hhvm/hhvm:latest" test
5052

5153
test-coverage: ## Run all tests and output coverage to the console.

build/coverage_to_scruitinizer.sh

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/Adapter/AdapterInterface.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,30 @@
2424
interface AdapterInterface
2525
{
2626
/**
27+
* Acknowledge the messages (delete them from the queue)
28+
*
2729
* @param MessageInterface[] $messages
2830
*
31+
* @return void
32+
*
2933
* @throws FailedAcknowledgementException
3034
*/
3135
public function acknowledge(array $messages);
3236

3337
/**
38+
* Attempt to reject all the following messages (make the message immediately visible to other consumers)
39+
*
40+
* @param MessageInterface[] $messages
41+
*
42+
* @return void
43+
*
44+
* @throws FailedAcknowledgementException
45+
*/
46+
public function reject(array $messages);
47+
48+
/**
49+
* Remove up to {$limit} messages from the queue
50+
*
3451
* @param MessageFactoryInterface $factory
3552
* @param int $limit
3653
*
@@ -39,18 +56,26 @@ public function acknowledge(array $messages);
3956
public function dequeue(MessageFactoryInterface $factory, $limit);
4057

4158
/**
59+
* Add all the messages to the queue
60+
*
4261
* @param MessageInterface[] $messages
4362
*
63+
* @return void
64+
*
4465
* @throws FailedEnqueueException
4566
*/
4667
public function enqueue(array $messages);
4768

4869
/**
70+
* Empty the queue
71+
*
4972
* @return void
5073
*/
5174
public function purge();
5275

5376
/**
77+
* Delete the queue
78+
*
5479
* @return void
5580
*/
5681
public function delete();

src/Adapter/ArrayAdapter.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
namespace Graze\Queue\Adapter;
1717

1818
use ArrayIterator;
19+
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
1920
use Graze\Queue\Message\MessageFactoryInterface;
2021
use Graze\Queue\Message\MessageInterface;
2122
use LimitIterator;
@@ -43,6 +44,20 @@ public function acknowledge(array $messages)
4344
}));
4445
}
4546

47+
/**
48+
* Attempt to reject all the following messages (make the message immediately visible to other consumers)
49+
*
50+
* @param MessageInterface[] $messages
51+
*
52+
* @return void
53+
*
54+
* @throws FailedAcknowledgementException
55+
*/
56+
public function reject(array $messages)
57+
{
58+
// do nothing, timeouts not implemented, so messages are immediately available
59+
}
60+
4661
/**
4762
* @param MessageFactoryInterface $factory
4863
* @param int $limit

src/Adapter/FirehoseAdapter.php

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
namespace Graze\Queue\Adapter;
1717

1818
use Aws\Firehose\FirehoseClient;
19-
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
2019
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
20+
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
2121
use Graze\Queue\Message\MessageFactoryInterface;
2222
use Graze\Queue\Message\MessageInterface;
2323

@@ -31,7 +31,7 @@
3131
*/
3232
final class FirehoseAdapter implements AdapterInterface
3333
{
34-
const BATCHSIZE_SEND = 100;
34+
const BATCHSIZE_SEND = 100;
3535

3636
/** @var FirehoseClient */
3737
protected $client;
@@ -44,8 +44,8 @@ final class FirehoseAdapter implements AdapterInterface
4444

4545
/**
4646
* @param FirehoseClient $client
47-
* @param string $deliveryStreamName
48-
* @param array $options - BatchSize <integer> The number of messages to send in each batch.
47+
* @param string $deliveryStreamName
48+
* @param array $options - BatchSize <integer> The number of messages to send in each batch.
4949
*/
5050
public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
5151
{
@@ -68,6 +68,18 @@ public function acknowledge(array $messages)
6868
);
6969
}
7070

71+
/**
72+
* @param MessageInterface[] $messages
73+
*/
74+
public function reject(array $messages)
75+
{
76+
throw new MethodNotSupportedException(
77+
__FUNCTION__,
78+
$this,
79+
$messages
80+
);
81+
}
82+
7183
/**
7284
* @param MessageFactoryInterface $factory
7385
* @param int $limit
@@ -97,15 +109,18 @@ public function enqueue(array $messages)
97109
);
98110

99111
foreach ($batches as $batch) {
100-
$requestRecords = array_map(function (MessageInterface $message) {
101-
return [
102-
'Data' => $message->getBody()
103-
];
104-
}, $batch);
112+
$requestRecords = array_map(
113+
function (MessageInterface $message) {
114+
return [
115+
'Data' => $message->getBody(),
116+
];
117+
},
118+
$batch
119+
);
105120

106121
$request = [
107122
'DeliveryStreamName' => $this->deliveryStreamName,
108-
'Records' => $requestRecords,
123+
'Records' => $requestRecords,
109124
];
110125

111126
$results = $this->client->putRecordBatch($request);

0 commit comments

Comments
 (0)