1818namespace MongoDB \Operation ;
1919
2020use MongoDB \ChangeStream ;
21+ use MongoDB \BSON \TimestampInterface ;
2122use MongoDB \Driver \Command ;
23+ use MongoDB \Driver \Cursor ;
2224use MongoDB \Driver \Manager ;
2325use MongoDB \Driver \ReadConcern ;
2426use MongoDB \Driver \ReadPreference ;
2527use MongoDB \Driver \Server ;
2628use MongoDB \Driver \Session ;
2729use MongoDB \Driver \Exception \RuntimeException ;
30+ use MongoDB \Driver \Monitoring \CommandFailedEvent ;
31+ use MongoDB \Driver \Monitoring \CommandSubscriber ;
32+ use MongoDB \Driver \Monitoring \CommandStartedEvent ;
33+ use MongoDB \Driver \Monitoring \CommandSucceededEvent ;
2834use MongoDB \Exception \InvalidArgumentException ;
2935use MongoDB \Exception \UnexpectedValueException ;
3036use MongoDB \Exception \UnsupportedException ;
3137
3238/**
3339 * Operation for creating a change stream with the aggregate command.
3440 *
41+ * Note: the implementation of CommandSubscriber is an internal implementation
42+ * detail and should not be considered part of the public API.
43+ *
3544 * @api
3645 * @see \MongoDB\Collection::watch()
3746 * @see https://docs.mongodb.com/manual/changeStreams/
3847 */
39- class Watch implements Executable
48+ class Watch implements Executable, /* @internal */ CommandSubscriber
4049{
50+ private static $ wireVersionForOperationTime = 7 ;
51+
4152 const FULL_DOCUMENT_DEFAULT = 'default ' ;
4253 const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup ' ;
4354
4455 private $ aggregate ;
45- private $ databaseName ;
56+ private $ aggregateOptions ;
57+ private $ changeStreamOptions ;
4658 private $ collectionName ;
59+ private $ databaseName ;
60+ private $ operationTime ;
4761 private $ pipeline ;
48- private $ options ;
4962 private $ resumeCallable ;
5063
5164 /**
@@ -79,22 +92,44 @@ class Watch implements Executable
7992 * * resumeAfter (document): Specifies the logical starting point for the
8093 * new change stream.
8194 *
95+ * Using this option in conjunction with "startAtOperationTime" will
96+ * result in a server error. The options are mutually exclusive.
97+ *
8298 * * session (MongoDB\Driver\Session): Client session.
8399 *
84100 * Sessions are not supported for server versions < 3.6.
85101 *
102+ * * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
103+ * the change stream will only provide changes that occurred at or after
104+ * the specified timestamp. Any command run against the server will
105+ * return an operation time that can be used here. Alternatively, an
106+ * operation time may be obtained from MongoDB\Driver\Server::getInfo().
107+ *
108+ * Using this option in conjunction with "resumeAfter" will result in a
109+ * server error. The options are mutually exclusive.
110+ *
111+ * This option is not supported for server versions < 4.0.
112+ *
86113 * * typeMap (array): Type map for BSON deserialization. This will be
87114 * applied to the returned Cursor (it is not sent to the server).
88115 *
89- * @param string $databaseName Database name
90- * @param string $collectionName Collection name
116+ * Note: A database-level change stream may be created by specifying null
117+ * for the collection name. A cluster-level change stream may be created by
118+ * specifying null for both the database and collection name.
119+ *
120+ * @param Manager $manager Manager instance from the driver
121+ * @param string|null $databaseName Database name
122+ * @param string|null $collectionName Collection name
91123 * @param array $pipeline List of pipeline operations
92124 * @param array $options Command options
93- * @param Manager $manager Manager instance from the driver
94125 * @throws InvalidArgumentException for parameter/option parsing errors
95126 */
96127 public function __construct (Manager $ manager , $ databaseName , $ collectionName , array $ pipeline , array $ options = [])
97128 {
129+ if (isset ($ collectionName ) && ! isset ($ databaseName )) {
130+ throw new InvalidArgumentException ('$collectionName should also be null if $databaseName is null ' );
131+ }
132+
98133 $ options += [
99134 'fullDocument ' => self ::FULL_DOCUMENT_DEFAULT ,
100135 'readPreference ' => new ReadPreference (ReadPreference::RP_PRIMARY ),
@@ -104,10 +139,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
104139 throw InvalidArgumentException::invalidType ('"fullDocument" option ' , $ options ['fullDocument ' ], 'string ' );
105140 }
106141
107- if (isset ($ options ['resumeAfter ' ])) {
108- if ( ! is_array ($ options ['resumeAfter ' ]) && ! is_object ($ options ['resumeAfter ' ])) {
109- throw InvalidArgumentException::invalidType ('"resumeAfter" option ' , $ options ['resumeAfter ' ], 'array or object ' );
110- }
142+ if (isset ($ options ['resumeAfter ' ]) && ! is_array ($ options ['resumeAfter ' ]) && ! is_object ($ options ['resumeAfter ' ])) {
143+ throw InvalidArgumentException::invalidType ('"resumeAfter" option ' , $ options ['resumeAfter ' ], 'array or object ' );
144+ }
145+
146+ if (isset ($ options ['startAtOperationTime ' ]) && ! $ options ['startAtOperationTime ' ] instanceof TimestampInterface) {
147+ throw InvalidArgumentException::invalidType ('"startAtOperationTime" option ' , $ options ['startAtOperationTime ' ], TimestampInterface::class);
111148 }
112149
113150 /* In the absence of an explicit session, create one to ensure that the
@@ -122,15 +159,47 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
122159 }
123160 }
124161
162+ $ this ->aggregateOptions = array_intersect_key ($ options , ['batchSize ' => 1 , 'collation ' => 1 , 'maxAwaitTimeMS ' => 1 , 'readConcern ' => 1 , 'readPreference ' => 1 , 'session ' => 1 , 'typeMap ' => 1 ]);
163+ $ this ->changeStreamOptions = array_intersect_key ($ options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 , 'startAtOperationTime ' => 1 ]);
164+
165+ // Null database name implies a cluster-wide change stream
166+ if ($ databaseName === null ) {
167+ $ databaseName = 'admin ' ;
168+ $ this ->changeStreamOptions ['allChangesForCluster ' ] = true ;
169+ }
170+
125171 $ this ->databaseName = (string ) $ databaseName ;
126- $ this ->collectionName = ( string ) $ collectionName ;
172+ $ this ->collectionName = isset ( $ collectionName ) ? ( string ) $ collectionName : null ;
127173 $ this ->pipeline = $ pipeline ;
128- $ this ->options = $ options ;
129174
130175 $ this ->aggregate = $ this ->createAggregate ();
131176 $ this ->resumeCallable = $ this ->createResumeCallable ($ manager );
132177 }
133178
179+ /** @internal */
180+ final public function commandFailed (CommandFailedEvent $ event )
181+ {
182+ }
183+
184+ /** @internal */
185+ final public function commandStarted (CommandStartedEvent $ event )
186+ {
187+ }
188+
189+ /** @internal */
190+ final public function commandSucceeded (CommandSucceededEvent $ event )
191+ {
192+ if ($ event ->getCommandName () !== 'aggregate ' ) {
193+ return ;
194+ }
195+
196+ $ reply = $ event ->getReply ();
197+
198+ if (isset ($ reply ->operationTime ) && $ reply ->operationTime instanceof TimestampInterface) {
199+ $ this ->operationTime = $ reply ->operationTime ;
200+ }
201+ }
202+
134203 /**
135204 * Execute the operation.
136205 *
@@ -142,47 +211,74 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
142211 */
143212 public function execute (Server $ server )
144213 {
145- $ cursor = $ this ->aggregate ->execute ($ server );
146-
147- return new ChangeStream ($ cursor , $ this ->resumeCallable );
214+ return new ChangeStream ($ this ->executeAggregate ($ server ), $ this ->resumeCallable );
148215 }
149216
150217 /**
151218 * Create the aggregate command for creating a change stream.
152219 *
153- * This method is also used to recreate the aggregate command if a new
154- * resume token is provided while resuming.
220+ * This method is also used to recreate the aggregate command when resuming.
155221 *
156222 * @return Aggregate
157223 */
158224 private function createAggregate ()
159225 {
160- $ changeStreamOptions = array_intersect_key ($ this ->options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 ]);
161- $ changeStream = ['$changeStream ' => (object ) $ changeStreamOptions ];
162-
163226 $ pipeline = $ this ->pipeline ;
164- array_unshift ($ pipeline , $ changeStream );
227+ array_unshift ($ pipeline , [ ' $changeStream ' => ( object ) $ this -> changeStreamOptions ] );
165228
166- $ aggregateOptions = array_intersect_key ($ this ->options , ['batchSize ' => 1 , 'collation ' => 1 , 'maxAwaitTimeMS ' => 1 , 'readConcern ' => 1 , 'readPreference ' => 1 , 'session ' => 1 , 'typeMap ' => 1 ]);
167-
168- return new Aggregate ($ this ->databaseName , $ this ->collectionName , $ pipeline , $ aggregateOptions );
229+ return new Aggregate ($ this ->databaseName , $ this ->collectionName , $ pipeline , $ this ->aggregateOptions );
169230 }
170231
171232 private function createResumeCallable (Manager $ manager )
172233 {
173234 return function ($ resumeToken = null ) use ($ manager ) {
174- /* If a resume token was provided, recreate the Aggregate operation
175- * using the new resume token . */
235+ /* If a resume token was provided, update the "resumeAfter" option
236+ * and ensure that "startAtOperationTime" is no longer set . */
176237 if ($ resumeToken !== null ) {
177- $ this ->options ['resumeAfter ' ] = $ resumeToken ;
178- $ this ->aggregate = $ this ->createAggregate ();
238+ $ this ->changeStreamOptions ['resumeAfter ' ] = $ resumeToken ;
239+ unset($ this ->changeStreamOptions ['startAtOperationTime ' ]);
240+ }
241+
242+ /* If we captured an operation time from the first aggregate command
243+ * and there is no "resumeAfter" option, set "startAtOperationTime"
244+ * so that we can resume from the original aggregate's time. */
245+ if ($ this ->operationTime !== null && ! isset ($ this ->changeStreamOptions ['resumeAfter ' ])) {
246+ $ this ->changeStreamOptions ['startAtOperationTime ' ] = $ this ->operationTime ;
179247 }
180248
249+ $ this ->aggregate = $ this ->createAggregate ();
250+
181251 /* Select a new server using the read preference, execute this
182252 * operation on it, and return the new ChangeStream. */
183- $ server = $ manager ->selectServer ($ this ->options ['readPreference ' ]);
253+ $ server = $ manager ->selectServer ($ this ->aggregateOptions ['readPreference ' ]);
184254
185255 return $ this ->execute ($ server );
186256 };
187257 }
258+
259+ /**
260+ * Execute the aggregate command and optionally capture its operation time.
261+ *
262+ * @param Server $server
263+ * @return Cursor
264+ */
265+ private function executeAggregate (Server $ server )
266+ {
267+ /* If we've already captured an operation time or the server does not
268+ * support returning an operation time (e.g. MongoDB 3.6), execute the
269+ * aggregation directly and return its cursor. */
270+ if ($ this ->operationTime !== null || ! \MongoDB \server_supports_feature ($ server , self ::$ wireVersionForOperationTime )) {
271+ return $ this ->aggregate ->execute ($ server );
272+ }
273+
274+ /* Otherwise, execute the aggregation using command monitoring so that
275+ * we can capture its operation time with commandSucceeded(). */
276+ \MongoDB \Driver \Monitoring \addSubscriber ($ this );
277+
278+ try {
279+ return $ this ->aggregate ->execute ($ server );
280+ } finally {
281+ \MongoDB \Driver \Monitoring \removeSubscriber ($ this );
282+ }
283+ }
188284}
0 commit comments