@@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up
1616all resources on your side.
1717Instead, you can use this library to stream your arbitrarily large input list
1818as individual records to a non-blocking (async) transformation handler. It uses
19- [ ReactPHP] ( https://reactphp.org ) to enable you to concurrently process multiple
19+ [ ReactPHP] ( https://reactphp.org/ ) to enable you to concurrently process multiple
2020records at once. You can control the concurrency limit, so that by allowing
2121it to process 10 operations at the same time, you can thus process this large
2222input list around 10 times faster and at the same time you're no longer limited
@@ -72,21 +72,25 @@ Once [installed](#install), you can use the following code to process an example
7272user lists by sending a (RESTful) HTTP API request for each user record:
7373
7474``` php
75+ <?php
76+
77+ require __DIR__ . '/vendor/autoload.php';
78+
7579$browser = new React\Http\Browser();
7680
7781$concurrency = isset($argv[1]) ? $argv[1] : 3;
7882
7983// each job should use the browser to GET a certain URL
8084// limit number of concurrent jobs here
81- $transformer = new Transformer($concurrency, function ($user) use ($browser) {
85+ $transformer = new Clue\React\Flux\ Transformer($concurrency, function ($user) use ($browser) {
8286 // skip users that do not have an IP address listed
8387 if (!isset($user['ip'])) {
8488 return React\Promise\resolve($user);
8589 }
8690
8791 // look up country for this IP
8892 return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
89- function (ResponseInterface $response) use ($user) {
93+ function (Psr\Http\Message\ ResponseInterface $response) use ($user) {
9094 // response successfully received
9195 // add country to user array and return updated user
9296 $user['country'] = (string)$response->getBody();
@@ -411,6 +415,10 @@ $transformer = new Transformer(10, function ($data) use ($http) {
411415});
412416
413417$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);
418+
419+ $transformer->on('error', function (Exception $e) {
420+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
421+ });
414422```
415423
416424Keep in mind that the transformation handler may return a rejected promise.
@@ -456,6 +464,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
456464
457465$promise->then(function ($count) {
458466 echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
467+ }, function (Exception $e) {
468+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
459469});
460470```
461471
@@ -561,6 +571,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
561571
562572$promise->then(function (ResponseInterface $response) {
563573 echo 'First successful job: ' . $response->getBody() . PHP_EOL;
574+ }, function (Exception $e) {
575+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
564576});
565577```
566578
0 commit comments