diff --git a/src/GuzzleAdapter.php b/src/GuzzleAdapter.php index 37bf5e9..1aa9288 100644 --- a/src/GuzzleAdapter.php +++ b/src/GuzzleAdapter.php @@ -3,6 +3,7 @@ namespace TraderInteractive\Api; use ArrayObject; +use GuzzleHttp\Pool; use TraderInteractive\Util; use GuzzleHttp\Client as GuzzleClient; use GuzzleHttp\ClientInterface as GuzzleClientInterface; @@ -22,11 +23,11 @@ final class GuzzleAdapter implements AdapterInterface const DEFAULT_CONCURRENCY_LIMIT = PHP_INT_MAX; /** - * Collection of Promise\PromiseInterface instances with keys matching what was given from start(). + * Collection of RequestInterface instances with keys matching what was given from start(). * * @var array */ - private $promises = []; + private $requests = []; /** * Collection of Api\Response with keys matching what was given from start(). @@ -72,7 +73,7 @@ public function __construct( public function start(RequestInterface $request) : string { $handle = uniqid(); - $this->promises[$handle] = $this->client->sendAsync($request); + $this->requests[$handle] = $request; return $handle; } @@ -83,7 +84,7 @@ public function start(RequestInterface $request) : string */ public function end(string $endHandle) : ResponseInterface { - $results = $this->fulfillPromises($this->promises, $this->exceptions); + $results = $this->fulfillPromises($this->requests, $this->exceptions); foreach ($results as $handle => $response) { try { $contents = (string)$response->getBody(); @@ -103,7 +104,7 @@ public function end(string $endHandle) : ResponseInterface } } - $this->promises = []; + $this->requests = []; if ($this->exceptions->offsetExists($endHandle)) { $exception = $this->exceptions[$endHandle]; @@ -121,31 +122,31 @@ public function end(string $endHandle) : ResponseInterface } /** - * Helper method to execute all guzzle promises. - * - * @param array $promises - * @param array $exceptions - * - * @return array Array of fulfilled PSR7 responses. + * @return ResponseInterface[] */ - private function fulfillPromises(array $promises, ArrayObject $exceptions) : array + private function fulfillPromises(array $requests, ArrayObject $exceptions) : array { - if (empty($promises)) { + if (empty($requests)) { return []; } - $results = new ArrayObject(); - Promise\Each::ofLimit( - $this->promises, - $this->concurrencyLimit, - function (ResponseInterface $response, $index) use ($results) { - $results[$index] = $response; - }, - function (RequestException $e, $index) use ($exceptions) { - $exceptions[$index] = $e; - } - )->wait(); + $responses = new ArrayObject(); + $pool = new Pool( + $this->client, + $requests, + [ + 'concurrency' => $this->concurrencyLimit, + Promise\Promise::FULFILLED => function (ResponseInterface $response, $index) use ($responses) { + $responses[$index] = $response; + }, + Promise\Promise::REJECTED => function (RequestException $e, $index) use ($exceptions) { + $exceptions[$index] = $e; + } + ] + ); + $promise = $pool->promise(); + $promise->wait(); - return $results->getArrayCopy(); + return $responses->getArrayCopy(); } }