Skip to content

Commit 1a89e8b

Browse files
committed
Merge branch 'v1.3'
2 parents 42f741d + 5d2a8a4 commit 1a89e8b

File tree

3 files changed

+105
-0
lines changed

3 files changed

+105
-0
lines changed

src/ChangeStream.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ public function next()
101101
$this->hasAdvanced = true;
102102
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
103103
}
104+
/* If the cursorId is 0, the server has invalidated the cursor so we
105+
* will never perform another getMore. This means that we cannot
106+
* resume and we can therefore unset the resumeCallable, which will
107+
* free any reference to Watch. This will also free the only
108+
* reference to an implicit session, since any such reference
109+
* belongs to Watch. */
110+
if ((string)$this->getCursorId() === '0') {
111+
$this->resumeCallable = null;
112+
}
104113
} catch (RuntimeException $e) {
105114
if (strpos($e->getMessage(), "not master") !== false) {
106115
$resumable = true;
@@ -130,6 +139,10 @@ public function rewind()
130139
$this->hasAdvanced = true;
131140
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
132141
}
142+
// As with next(), free the callable once we know it will never be used.
143+
if ((string)$this->getCursorId() === '0') {
144+
$this->resumeCallable = null;
145+
}
133146
} catch (RuntimeException $e) {
134147
if (strpos($e->getMessage(), "not master") !== false) {
135148
$resumable = true;

src/Operation/Watch.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
110110
}
111111
}
112112

113+
if ( ! isset($options['session'])) {
114+
try {
115+
$options['session'] = $manager->startSession();
116+
} catch (DriverRuntimeException $e) {}
117+
}
118+
113119
$this->databaseName = (string) $databaseName;
114120
$this->collectionName = (string) $collectionName;
115121
$this->pipeline = $pipeline;

tests/Operation/WatchFunctionalTest.php

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
use MongoDB\Driver\Server;
99
use MongoDB\Driver\Exception\ConnectionTimeoutException;
1010
use MongoDB\Exception\ResumeTokenException;
11+
use MongoDB\Operation\CreateCollection;
1112
use MongoDB\Operation\DatabaseCommand;
13+
use MongoDB\Operation\DropCollection;
1214
use MongoDB\Operation\InsertOne;
1315
use MongoDB\Operation\Watch;
1416
use MongoDB\Tests\CommandObserver;
@@ -580,6 +582,90 @@ public function testResumeTokenNotFoundAdvancesKey()
580582
$this->assertSame(2, $changeStream->key());
581583
}
582584

585+
public function testSessionPersistsAfterResume()
586+
{
587+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
588+
589+
$changeStream = null;
590+
$originalSession = null;
591+
$sessionAfterResume = [];
592+
$commands = [];
593+
594+
/* We want to ensure that the lsid of the initial aggregate matches the
595+
* lsid of any aggregates after the change stream resumes. After
596+
* PHPC-1152 is complete, we will ensure that the lsid of the initial
597+
* aggregate matches the lsid of any subsequent aggregates and getMores.
598+
*/
599+
(new CommandObserver)->observe(
600+
function() use ($operation, &$changeStream) {
601+
$changeStream = $operation->execute($this->getPrimaryServer());
602+
},
603+
function($changeStream) use (&$originalSession) {
604+
if (isset($changeStream->aggregate)) {
605+
$originalSession = bin2hex((string) $changeStream->lsid->id);
606+
}
607+
}
608+
);
609+
610+
$changeStream->rewind();
611+
$this->killChangeStreamCursor($changeStream);
612+
613+
(new CommandObserver)->observe(
614+
function() use (&$changeStream) {
615+
$changeStream->next();
616+
},
617+
function ($changeStream) use (&$sessionAfterResume, &$commands) {
618+
$commands[] = key((array) $changeStream);
619+
$sessionAfterResume[] = bin2hex((string) $changeStream->lsid->id);
620+
}
621+
);
622+
623+
$expectedCommands = [
624+
/* We expect a getMore to be issued because we are calling next(). */
625+
'getMore',
626+
/* Since we have killed the cursor, ChangeStream will resume by
627+
* issuing a new aggregate commmand. */
628+
'aggregate',
629+
/* When ChangeStream resumes, it overwrites its original cursor with
630+
* the new cursor resulting from the last aggregate command. This
631+
* removes the last reference to the old cursor, which causes the
632+
* driver to kill it (via mongoc_cursor_destroy()). */
633+
'killCursors',
634+
/* Finally, ChangeStream will rewind the new cursor as the last step
635+
* of the resume process. This results in one last getMore. */
636+
'getMore',
637+
];
638+
639+
$this->assertSame($expectedCommands, $commands);
640+
641+
foreach ($sessionAfterResume as $session) {
642+
$this->assertEquals($session, $originalSession);
643+
}
644+
}
645+
646+
public function testSessionFreed()
647+
{
648+
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
649+
$operation->execute($this->getPrimaryServer());
650+
651+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
652+
$changeStream = $operation->execute($this->getPrimaryServer());
653+
654+
$rc = new ReflectionClass($changeStream);
655+
$rp = $rc->getProperty('resumeCallable');
656+
$rp->setAccessible(true);
657+
658+
$this->assertNotNull($rp->getValue($changeStream));
659+
660+
// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
661+
$operation = new DropCollection($this->getDatabaseName(), $this->getCollectionName());
662+
$operation->execute($this->getPrimaryServer());
663+
664+
$changeStream->next();
665+
666+
$this->assertNull($rp->getValue($changeStream));
667+
}
668+
583669
private function insertDocument($document)
584670
{
585671
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

0 commit comments

Comments
 (0)