@@ -121,6 +121,14 @@ class Http
121121 */
122122 protected $ queue ;
123123
124+ /**
125+ * Request queue to prevent API
126+ * overload.
127+ *
128+ * @var SplQueue
129+ */
130+ protected $ interactionQueue ;
131+
124132 /**
125133 * Number of requests that are waiting for a response.
126134 *
@@ -148,6 +156,7 @@ public function __construct(string $token, LoopInterface $loop, LoggerInterface
148156 $ this ->logger = $ logger ;
149157 $ this ->driver = $ driver ;
150158 $ this ->queue = new SplQueue ;
159+ $ this ->interactionQueue = new SplQueue ;
151160
152161 $ this ->promiseV3 = str_starts_with (InstalledVersions::getVersion ('react/promise ' ), '3. ' );
153162 }
@@ -451,7 +460,9 @@ protected function getBucket(string $key): Bucket
451460 if (! isset ($ this ->buckets [$ key ])) {
452461 $ bucket = new Bucket ($ key , $ this ->loop , $ this ->logger , function (Request $ request ) {
453462 $ deferred = new Deferred ();
454- $ this ->queue ->enqueue ([$ request , $ deferred ]);
463+ self ::isInteractionEndpoint ($ request )
464+ ? $ this ->interactionQueue ->enqueue ([$ request , $ deferred ])
465+ : $ this ->queue ->enqueue ([$ request , $ deferred ]);
455466 $ this ->checkQueue ();
456467
457468 return $ deferred ->promise ();
@@ -469,8 +480,11 @@ protected function getBucket(string $key): Bucket
469480 */
470481 protected function checkQueue (): void
471482 {
472- if ($ this ->queue ->isEmpty ()) {
473- $ this ->logger ->debug ('http not checking ' , ['waiting ' => $ this ->waiting , 'empty ' => $ this ->queue ->isEmpty ()]);
483+ $ this ->checkInteractionQueue ();
484+
485+ if ($ this ->waiting >= static ::CONCURRENT_REQUESTS || $ this ->queue ->isEmpty ()) {
486+ $ this ->logger ->debug ('http not checking queue ' , ['waiting ' => $ this ->waiting , 'empty ' => $ this ->queue ->isEmpty ()]);
487+
474488 return ;
475489 }
476490
@@ -479,30 +493,42 @@ protected function checkQueue(): void
479493 * @var Deferred $deferred
480494 */
481495 [$ request , $ deferred ] = $ this ->queue ->dequeue ();
496+ ++$ this ->waiting ;
497+
498+ $ this ->executeRequest ($ request )->then (function ($ result ) use ($ deferred ) {
499+ --$ this ->waiting ;
500+ $ this ->checkQueue ();
501+ $ deferred ->resolve ($ result );
502+ }, function ($ e ) use ($ deferred ) {
503+ --$ this ->waiting ;
504+ $ this ->checkQueue ();
505+ $ deferred ->reject ($ e );
506+ });
507+ }
508+
509+ /**
510+ * Checks the interaction queue to see if more requests can be
511+ * sent out.
512+ */
513+ protected function checkInteractionQueue (): void
514+ {
515+ if ($ this ->interactionQueue ->isEmpty ()) {
516+ $ this ->logger ->debug ('http not checking interaction queue ' , ['waiting ' => $ this ->waiting , 'empty ' => $ this ->interactionQueue ->isEmpty ()]);
482517
483- // Allow interaction endpoints to bypass the concurrent request limit
484- if (!(($ is_interaction_endpoint = $ this ->isInteractionEndpoint ($ request )) || $ this ->waiting < static ::CONCURRENT_REQUESTS )) {
485- // If not allowed, re-queue and exit
486- $ this ->queue ->enqueue ([$ request , $ deferred ]);
487- $ this ->logger ->debug ('http not checking ' , ['waiting ' => $ this ->waiting , 'empty ' => $ this ->queue ->isEmpty ()]);
488518 return ;
489519 }
490520
491- if (!$ is_interaction_endpoint ) {
492- ++$ this ->waiting ;
493- }
521+ /**
522+ * @var Request $request
523+ * @var Deferred $deferred
524+ */
525+ [$ request , $ deferred ] = $ this ->interactionQueue ->dequeue ();
494526
495- $ this ->executeRequest ($ request )->then (function ($ result ) use ($ deferred , $ request ) {
496- if (!$ this ->isInteractionEndpoint ($ request )) {
497- --$ this ->waiting ;
498- }
499- $ this ->checkQueue ();
527+ $ this ->executeRequest ($ request )->then (function ($ result ) use ($ deferred ) {
528+ $ this ->checkInteractionQueue ();
500529 $ deferred ->resolve ($ result );
501- }, function ($ e ) use ($ deferred , $ request ) {
502- if (!$ this ->isInteractionEndpoint ($ request )) {
503- --$ this ->waiting ;
504- }
505- $ this ->checkQueue ();
530+ }, function ($ e ) use ($ deferred ) {
531+ $ this ->checkInteractionQueue ();
506532 $ deferred ->reject ($ e );
507533 });
508534 }
@@ -513,7 +539,7 @@ protected function checkQueue(): void
513539 * @param Request $request
514540 * @return bool
515541 */
516- protected function isInteractionEndpoint (Request $ request ): bool
542+ public static function isInteractionEndpoint (Request $ request ): bool
517543 {
518544 // Adjust this check if you support more interaction endpoints
519545 $ endpoint = (string ) $ request ->getUrl ();
0 commit comments