Skip to content

Commit 3084e3f

Browse files
joemeehanHarry Bragg
authored andcommitted
Add Kinesis Firehose adapter and tests (#42)
* Add Kinesis Firehose adapter and tests * Simplify message structure and support configurable batch size through options * Remove unused use statement * Add (and fix) failed enqueue test * Move enqueue failure test to integration tests
1 parent c1b2ad7 commit 3084e3f

File tree

3 files changed

+357
-0
lines changed

3 files changed

+357
-0
lines changed

src/Adapter/FirehoseAdapter.php

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
<?php
2+
3+
/**
4+
* This file is part of graze/queue.
5+
*
6+
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*
11+
* @license https://github.com/graze/queue/blob/master/LICENSE MIT
12+
*
13+
* @link https://github.com/graze/queue
14+
*/
15+
16+
namespace Graze\Queue\Adapter;
17+
18+
use Aws\Firehose\FirehoseClient;
19+
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
20+
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21+
use Graze\Queue\Message\MessageFactoryInterface;
22+
use Graze\Queue\Message\MessageInterface;
23+
24+
/**
25+
* Amazon AWS Kinesis Firehose Adapter.
26+
*
27+
* This method only supports the enqueue method to send messages to a Kinesiss
28+
* Firehose stream
29+
*
30+
* @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Firehose.FirehoseClient.html#putRecordBatch
31+
*/
32+
final class FirehoseAdapter implements AdapterInterface
33+
{
34+
const BATCHSIZE_SEND = 100;
35+
36+
/** @var FirehoseClient */
37+
protected $client;
38+
39+
/** @var array */
40+
protected $options;
41+
42+
/** @var string */
43+
protected $deliveryStreamName;
44+
45+
/**
46+
* @param FirehoseClient $client
47+
* @param string $deliveryStreamName
48+
* @param array $options - BatchSize <integer> The number of messages to send in each batch.
49+
*/
50+
public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
51+
{
52+
$this->client = $client;
53+
$this->deliveryStreamName = $deliveryStreamName;
54+
$this->options = $options;
55+
}
56+
57+
/**
58+
* @param MessageInterface[] $messages
59+
*
60+
* @throws MethodNotSupportedException
61+
*/
62+
public function acknowledge(array $messages)
63+
{
64+
throw new MethodNotSupportedException(
65+
__FUNCTION__,
66+
$this,
67+
$messages
68+
);
69+
}
70+
71+
/**
72+
* @param MessageFactoryInterface $factory
73+
* @param int $limit
74+
*
75+
* @throws MethodNotSupportedException
76+
*/
77+
public function dequeue(MessageFactoryInterface $factory, $limit)
78+
{
79+
throw new MethodNotSupportedException(
80+
__FUNCTION__,
81+
$this,
82+
[]
83+
);
84+
}
85+
86+
/**
87+
* @param MessageInterface[] $messages
88+
*
89+
* @throws FailedEnqueueException
90+
*/
91+
public function enqueue(array $messages)
92+
{
93+
$failed = [];
94+
$batches = array_chunk(
95+
$messages,
96+
$this->getOption('BatchSize', self::BATCHSIZE_SEND)
97+
);
98+
99+
foreach ($batches as $batch) {
100+
$requestRecords = array_map(function (MessageInterface $message) {
101+
return [
102+
'Data' => $message->getBody()
103+
];
104+
}, $batch);
105+
106+
$request = [
107+
'DeliveryStreamName' => $this->deliveryStreamName,
108+
'Records' => $requestRecords,
109+
];
110+
111+
$results = $this->client->putRecordBatch($request);
112+
113+
foreach ($results->get('RequestResponses') as $idx => $response) {
114+
if (isset($response['ErrorCode'])) {
115+
$failed[] = $batch[$idx];
116+
}
117+
}
118+
}
119+
120+
if (!empty($failed)) {
121+
throw new FailedEnqueueException($this, $failed);
122+
}
123+
}
124+
125+
/**
126+
* @param string $name
127+
* @param mixed $default
128+
*
129+
* @return mixed
130+
*/
131+
protected function getOption($name, $default = null)
132+
{
133+
return isset($this->options[$name]) ? $this->options[$name] : $default;
134+
}
135+
136+
/**
137+
* @throws MethodNotSupportedException
138+
*/
139+
public function purge()
140+
{
141+
throw new MethodNotSupportedException(
142+
__FUNCTION__,
143+
$this,
144+
[]
145+
);
146+
}
147+
148+
/**
149+
* @throws MethodNotSupportedException
150+
*/
151+
public function delete()
152+
{
153+
throw new MethodNotSupportedException(
154+
__FUNCTION__,
155+
$this,
156+
[]
157+
);
158+
}
159+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/**
4+
* This file is part of graze/queue.
5+
*
6+
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*
11+
* @license https://github.com/graze/queue/blob/master/LICENSE MIT
12+
*
13+
* @link https://github.com/graze/queue
14+
*/
15+
16+
namespace Graze\Queue;
17+
18+
use Aws\ResultInterface;
19+
use Aws\Firehose\FirehoseClient;
20+
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21+
use Graze\Queue\Adapter\FirehoseAdapter;
22+
use Mockery as m;
23+
use Mockery\MockInterface;
24+
use PHPUnit_Framework_TestCase as TestCase;
25+
26+
class FirehoseIntegrationTest extends TestCase
27+
{
28+
/** @var string */
29+
private $deliveryStreamName;
30+
/** @var FirehoseClient|MockInterface */
31+
private $firehoseClient;
32+
/** @var Client */
33+
private $client;
34+
35+
public function setUp()
36+
{
37+
$this->deliveryStreamName = 'delivery_stream_foo';
38+
$this->firehoseClient = m::mock(FirehoseClient::class);
39+
$this->client = new Client(new FirehoseAdapter($this->firehoseClient, 'delivery_stream_foo'));
40+
}
41+
42+
public function testSend()
43+
{
44+
$model = m::mock(ResultInterface::class);
45+
$model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([]);
46+
47+
$this->firehoseClient->shouldReceive('putRecordBatch')->once()->with([
48+
'DeliveryStreamName' => $this->deliveryStreamName,
49+
'Records' => [
50+
['Data' => 'foo']
51+
]
52+
])->andReturn($model);
53+
54+
$this->client->send([$this->client->create('foo')]);
55+
}
56+
57+
/**
58+
* @expectedException \Graze\Queue\Adapter\Exception\FailedEnqueueException
59+
*/
60+
public function testSendError()
61+
{
62+
$model = m::mock(ResultInterface::class);
63+
$model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([
64+
[
65+
'ErrorCode' => 'fooError',
66+
'ErrorMessage' => 'Some error message',
67+
'RecordId' => 'foo',
68+
]
69+
]);
70+
71+
$this->firehoseClient->shouldReceive('putRecordBatch')->once()->with([
72+
'DeliveryStreamName' => $this->deliveryStreamName,
73+
'Records' => [
74+
['Data' => 'foo'],
75+
],
76+
])->andReturn($model);
77+
78+
$this->client->send([$this->client->create('foo')]);
79+
}
80+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
<?php
2+
3+
/**
4+
* This file is part of graze/queue.
5+
*
6+
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*
11+
* @license https://github.com/graze/queue/blob/master/LICENSE MIT
12+
*
13+
* @link https://github.com/graze/queue
14+
*/
15+
16+
namespace Graze\Queue\Adapter;
17+
18+
use Aws\ResultInterface;
19+
use Aws\Firehose\FirehoseClient;
20+
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
21+
use Graze\Queue\Message\MessageFactoryInterface;
22+
use Graze\Queue\Message\MessageInterface;
23+
use Mockery as m;
24+
use Mockery\MockInterface;
25+
use PHPUnit_Framework_TestCase as TestCase;
26+
27+
class FirehoseAdapterTest extends TestCase
28+
{
29+
/** @var MessageInterface|MockInterface */
30+
private $messageA;
31+
/** @var MessageInterface|MockInterface */
32+
private $messageB;
33+
/** @var MessageInterface|MockInterface */
34+
private $messageC;
35+
/** @var MessageInterface[]|MockInterface[] */
36+
private $messages;
37+
/** @var ResultInterface|MockInterface */
38+
private $model;
39+
/** @var MessageFactoryInterface|MockInterface */
40+
private $factory;
41+
/** @var FirehoseClient */
42+
private $client;
43+
44+
public function setUp()
45+
{
46+
$this->client = m::mock(FirehoseClient::class);
47+
$this->model = m::mock(ResultInterface::class);
48+
$this->factory = m::mock(MessageFactoryInterface::class);
49+
50+
$this->messageA = $a = m::mock(MessageInterface::class);
51+
$this->messageB = $b = m::mock(MessageInterface::class);
52+
$this->messageC = $c = m::mock(MessageInterface::class);
53+
$this->messages = [$a, $b, $c];
54+
}
55+
56+
public function testInterface()
57+
{
58+
assertThat(new FirehoseAdapter($this->client, 'foo'), is(anInstanceOf('Graze\Queue\Adapter\AdapterInterface')));
59+
}
60+
61+
public function testEnqueue()
62+
{
63+
$adapter = new FirehoseAdapter($this->client, 'foo');
64+
65+
$this->messageA->shouldReceive('getBody')->once()->withNoArgs()->andReturn('foo');
66+
$this->messageB->shouldReceive('getBody')->once()->withNoArgs()->andReturn('bar');
67+
$this->messageC->shouldReceive('getBody')->once()->withNoArgs()->andReturn('baz');
68+
69+
$this->model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([]);
70+
71+
$this->client->shouldReceive('putRecordBatch')->once()->with([
72+
'DeliveryStreamName' => 'foo',
73+
'Records' => [
74+
['Data' => 'foo'],
75+
['Data' => 'bar'],
76+
['Data' => 'baz'],
77+
],
78+
])->andReturn($this->model);
79+
80+
$adapter->enqueue($this->messages);
81+
}
82+
83+
/**
84+
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
85+
*/
86+
public function testAcknowledge()
87+
{
88+
$adapter = new FirehoseAdapter($this->client, 'foo');
89+
$adapter->acknowledge($this->messages);
90+
}
91+
92+
/**
93+
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
94+
*/
95+
public function testDequeue()
96+
{
97+
$adapter = new FirehoseAdapter($this->client, 'foo');
98+
$adapter->dequeue($this->factory, 10);
99+
}
100+
101+
/**
102+
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
103+
*/
104+
public function testPurge()
105+
{
106+
$adapter = new FirehoseAdapter($this->client, 'foo');
107+
$adapter->purge();
108+
}
109+
110+
/**
111+
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
112+
*/
113+
public function testDelete()
114+
{
115+
$adapter = new FirehoseAdapter($this->client, 'foo');
116+
$adapter->delete();
117+
}
118+
}

0 commit comments

Comments
 (0)