1717
1818class WatchFunctionalTest extends FunctionalTestCase
1919{
20+ private $ defaultOptions = ['maxAwaitTimeMS ' => 500 ];
21+
2022 public function setUp ()
2123 {
2224 parent ::setUp ();
@@ -34,7 +36,7 @@ public function testNextResumesAfterCursorNotFound()
3436 {
3537 $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
3638
37- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
39+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
3840 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
3941
4042 $ changeStream ->rewind ();
@@ -81,7 +83,7 @@ public function testNextResumesAfterConnectionException()
8183 $ manager = new Manager ($ this ->getUri (), ['socketTimeoutMS ' => 50 ]);
8284 $ primaryServer = $ manager ->selectServer (new ReadPreference (ReadPreference::RP_PRIMARY ));
8385
84- $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
86+ $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
8587 $ changeStream = $ operation ->execute ($ primaryServer );
8688
8789 /* Note: we intentionally do not start iteration with rewind() to ensure
@@ -134,7 +136,7 @@ public function testRewindResumesAfterConnectionException()
134136 $ manager = new Manager ($ this ->getUri (), ['socketTimeoutMS ' => 50 ]);
135137 $ primaryServer = $ manager ->selectServer (new ReadPreference (ReadPreference::RP_PRIMARY ));
136138
137- $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
139+ $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
138140 $ changeStream = $ operation ->execute ($ primaryServer );
139141
140142 $ commands = [];
@@ -180,7 +182,7 @@ public function testNoChangeAfterResumeBeforeInsert()
180182 {
181183 $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
182184
183- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
185+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
184186 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
185187
186188 $ changeStream ->rewind ();
@@ -225,7 +227,7 @@ public function testNoChangeAfterResumeBeforeInsert()
225227
226228 public function testKey ()
227229 {
228- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
230+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
229231 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
230232
231233 $ this ->assertFalse ($ changeStream ->valid ());
@@ -262,7 +264,7 @@ public function testNonEmptyPipeline()
262264 {
263265 $ pipeline = [['$project ' => ['foo ' => [0 ]]]];
264266
265- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
267+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
266268 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
267269
268270 $ this ->insertDocument (['_id ' => 1 ]);
@@ -313,7 +315,7 @@ public function testNextResumeTokenNotFound()
313315 {
314316 $ pipeline = [['$project ' => ['_id ' => 0 ]]];
315317
316- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
318+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
317319 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
318320
319321 /* Note: we intentionally do not start iteration with rewind() to ensure
@@ -331,7 +333,7 @@ public function testRewindResumeTokenNotFound()
331333 {
332334 $ pipeline = [['$project ' => ['_id ' => 0 ]]];
333335
334- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
336+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
335337 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
336338
337339 $ this ->insertDocument (['x ' => 1 ]);
@@ -347,7 +349,7 @@ public function testNextResumeTokenInvalidType()
347349 {
348350 $ pipeline = [['$project ' => ['_id ' => ['$literal ' => 'foo ' ]]]];
349351
350- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
352+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
351353 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
352354
353355 /* Note: we intentionally do not start iteration with rewind() to ensure
@@ -365,7 +367,7 @@ public function testRewindResumeTokenInvalidType()
365367 {
366368 $ pipeline = [['$project ' => ['_id ' => ['$literal ' => 'foo ' ]]]];
367369
368- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
370+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
369371 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
370372
371373 $ this ->insertDocument (['x ' => 1 ]);
@@ -378,13 +380,18 @@ public function testMaxAwaitTimeMS()
378380 /* On average, an acknowledged write takes about 20 ms to appear in a
379381 * change stream on the server so we'll use a higher maxAwaitTimeMS to
380382 * ensure we see the write. */
381- $ maxAwaitTimeMS = 100 ;
383+ $ maxAwaitTimeMS = 500 ;
382384
383385 /* Calculate an approximate pivot to use for time assertions. We will
384386 * assert that the duration of blocking responses is greater than this
385387 * value, and vice versa. */
386388 $ pivot = ($ maxAwaitTimeMS * 0.001 ) * 0.9 ;
387389
390+ /* Calculate an approximate upper bound to use for time assertions. We
391+ * will assert that the duration of blocking responses is less than this
392+ * value. */
393+ $ upperBound = ($ maxAwaitTimeMS * 0.001 ) * 1.5 ;
394+
388395 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => $ maxAwaitTimeMS ]);
389396 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
390397
@@ -397,7 +404,7 @@ public function testMaxAwaitTimeMS()
397404 $ changeStream ->rewind ();
398405 $ duration = microtime (true ) - $ startTime ;
399406 $ this ->assertGreaterThan ($ pivot , $ duration );
400- $ this ->assertLessThan (0.5 , $ duration );
407+ $ this ->assertLessThan ($ upperBound , $ duration );
401408
402409 $ this ->assertFalse ($ changeStream ->valid ());
403410
@@ -407,7 +414,7 @@ public function testMaxAwaitTimeMS()
407414 $ changeStream ->next ();
408415 $ duration = microtime (true ) - $ startTime ;
409416 $ this ->assertGreaterThan ($ pivot , $ duration );
410- $ this ->assertLessThan (0.5 , $ duration );
417+ $ this ->assertLessThan ($ upperBound , $ duration );
411418
412419 $ this ->assertFalse ($ changeStream ->valid ());
413420
@@ -424,7 +431,7 @@ public function testMaxAwaitTimeMS()
424431
425432 public function testRewindResumesAfterCursorNotFound ()
426433 {
427- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
434+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
428435 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
429436
430437 $ this ->killChangeStreamCursor ($ changeStream );
@@ -436,7 +443,7 @@ public function testRewindResumesAfterCursorNotFound()
436443
437444 public function testRewindExtractsResumeTokenAndNextResumes ()
438445 {
439- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
446+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
440447 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
441448
442449 $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
@@ -473,7 +480,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
473480 */
474481 public function testTypeMapOption (array $ typeMap , $ expectedChangeDocument )
475482 {
476- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => 100 , ' typeMap ' => $ typeMap ]);
483+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['typeMap ' => $ typeMap ] + $ this -> defaultOptions );
477484 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
478485
479486 $ changeStream ->rewind ();
0 commit comments