@@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up
1616all resources on your side.
1717Instead, you can use this library to stream your arbitrarily large input list
1818as individual records to a non-blocking (async) transformation handler. It uses
19- [ ReactPHP] ( https://reactphp.org ) to enable you to concurrently process multiple
19+ [ ReactPHP] ( https://reactphp.org/ ) to enable you to concurrently process multiple
2020records at once. You can control the concurrency limit, so that by allowing
2121it to process 10 operations at the same time, you can thus process this large
2222input list around 10 times faster and at the same time you're no longer limited
@@ -72,21 +72,25 @@ Once [installed](#install), you can use the following code to process an example
7272user lists by sending a (RESTful) HTTP API request for each user record:
7373
7474``` php
75+ <?php
76+
77+ require __DIR__ . '/vendor/autoload.php';
78+
7579$browser = new React\Http\Browser();
7680
7781$concurrency = isset($argv[1]) ? $argv[1] : 3;
7882
7983// each job should use the browser to GET a certain URL
8084// limit number of concurrent jobs here
81- $transformer = new Transformer($concurrency, function ($user) use ($browser) {
85+ $transformer = new Clue\React\Flux\ Transformer($concurrency, function ($user) use ($browser) {
8286 // skip users that do not have an IP address listed
8387 if (!isset($user['ip'])) {
8488 return React\Promise\resolve($user);
8589 }
8690
8791 // look up country for this IP
8892 return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
89- function (ResponseInterface $response) use ($user) {
93+ function (Psr\Http\Message\ ResponseInterface $response) use ($user) {
9094 // response successfully received
9195 // add country to user array and return updated user
9296 $user['country'] = (string)$response->getBody();
@@ -114,7 +118,9 @@ $transformer->on('data', function ($user) {
114118$transformer->on('end', function () {
115119 echo '[DONE]' . PHP_EOL;
116120});
117- $transformer->on('error', 'printf');
121+ $transformer->on('error', function (Exception $e) {
122+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
123+ });
118124
119125```
120126
@@ -241,7 +247,7 @@ $transformer = new Transformer(10, function ($url) use ($browser) {
241247 return json_decode($response->getBody());
242248 },
243249 function (Exception $error) {
244- var_dump('There was an error', $error ->getMessage()) ;
250+ echo 'Error: ' . $e ->getMessage() . PHP_EOL ;
245251
246252 throw $error;
247253 }
@@ -411,6 +417,10 @@ $transformer = new Transformer(10, function ($data) use ($http) {
411417});
412418
413419$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);
420+
421+ $transformer->on('error', function (Exception $e) {
422+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
423+ });
414424```
415425
416426Keep in mind that the transformation handler may return a rejected promise.
@@ -456,6 +466,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
456466
457467$promise->then(function ($count) {
458468 echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
469+ }, function (Exception $e) {
470+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
459471});
460472```
461473
@@ -561,6 +573,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
561573
562574$promise->then(function (ResponseInterface $response) {
563575 echo 'First successful job: ' . $response->getBody() . PHP_EOL;
576+ }, function (Exception $e) {
577+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
564578});
565579```
566580
0 commit comments