@@ -102,22 +102,7 @@ public function next()
102102 {
103103 try {
104104 $ this ->csIt ->next ();
105- if ($ this ->valid ()) {
106- if ($ this ->hasAdvanced ) {
107- $ this ->key ++;
108- }
109- $ this ->hasAdvanced = true ;
110- $ this ->resumeToken = $ this ->extractResumeToken ($ this ->csIt ->current ());
111- }
112- /* If the cursorId is 0, the server has invalidated the cursor so we
113- * will never perform another getMore. This means that we cannot
114- * resume and we can therefore unset the resumeCallable, which will
115- * free any reference to Watch. This will also free the only
116- * reference to an implicit session, since any such reference
117- * belongs to Watch. */
118- if ((string ) $ this ->getCursorId () === '0 ' ) {
119- $ this ->resumeCallable = null ;
120- }
105+ $ this ->onIteration (true );
121106 } catch (RuntimeException $ e ) {
122107 if ($ this ->isResumableError ($ e )) {
123108 $ this ->resume ();
@@ -133,14 +118,7 @@ public function rewind()
133118 {
134119 try {
135120 $ this ->csIt ->rewind ();
136- if ($ this ->valid ()) {
137- $ this ->hasAdvanced = true ;
138- $ this ->resumeToken = $ this ->extractResumeToken ($ this ->csIt ->current ());
139- }
140- // As with next(), free the callable once we know it will never be used.
141- if ((string ) $ this ->getCursorId () === '0 ' ) {
142- $ this ->resumeCallable = null ;
143- }
121+ $ this ->onIteration (false );
144122 } catch (RuntimeException $ e ) {
145123 if ($ this ->isResumableError ($ e )) {
146124 $ this ->resume ();
@@ -160,7 +138,7 @@ public function valid()
160138 /**
161139 * Extracts the resume token (i.e. "_id" field) from the change document.
162140 *
163- * @param array|document $document Change document
141+ * @param array|object $document Change document
164142 * @return mixed
165143 * @throws InvalidArgumentException
166144 * @throws ResumeTokenException if the resume token is not found or invalid
@@ -214,6 +192,38 @@ private function isResumableError(RuntimeException $exception)
214192 return true ;
215193 }
216194
195+ /**
196+ * Perform housekeeping after an iteration event (i.e. next or rewind).
197+ *
198+ * @param boolean $isNext Whether the iteration event was a call to next()
199+ * @throws ResumeTokenException
200+ */
201+ private function onIteration ($ isNext )
202+ {
203+ /* If the cursorId is 0, the server has invalidated the cursor and we
204+ * will never perform another getMore nor need to resume since any
205+ * remaining results (up to and including the invalidate event) will
206+ * have been received in the last response. Therefore, we can unset the
207+ * resumeCallable. This will free any reference to Watch as well as the
208+ * only reference to any implicit session created therein. */
209+ if ((string ) $ this ->getCursorId () === '0 ' ) {
210+ $ this ->resumeCallable = null ;
211+ }
212+
213+ if (!$ this ->valid ()) {
214+ return ;
215+ }
216+
217+ /* Increment the key if the iteration event was a call to next() and we
218+ * have already advanced past the first result. */
219+ if ($ isNext && $ this ->hasAdvanced ) {
220+ $ this ->key ++;
221+ }
222+
223+ $ this ->hasAdvanced = true ;
224+ $ this ->resumeToken = $ this ->extractResumeToken ($ this ->csIt ->current ());
225+ }
226+
217227 /**
218228 * Creates a new changeStream after a resumable server error.
219229 *
@@ -224,5 +234,6 @@ private function resume()
224234 $ newChangeStream = call_user_func ($ this ->resumeCallable , $ this ->resumeToken );
225235 $ this ->csIt = $ newChangeStream ->csIt ;
226236 $ this ->csIt ->rewind ();
237+ $ this ->onIteration (false );
227238 }
228239}
0 commit comments