2727import java .util .function .Predicate ;
2828
2929import org .apache .geode .annotations .VisibleForTesting ;
30- import org .apache .geode .cache .CacheException ;
3130import org .apache .geode .cache .CacheListener ;
32- import org .apache .geode .cache .EntryNotFoundException ;
3331import org .apache .geode .cache .TransactionId ;
3432import org .apache .geode .cache .asyncqueue .AsyncEvent ;
3533import org .apache .geode .internal .cache .wan .AbstractGatewaySender ;
@@ -198,37 +196,17 @@ protected List<KeyAndEventPair> getElementsMatching(Predicate<GatewaySenderEvent
198196 }
199197
200198 @ Override
201- public synchronized void remove () throws CacheException {
202- if (peekedIds .isEmpty ()) {
203- return ;
204- }
205- boolean wasEmpty = lastDispatchedKey == lastDestroyedKey ;
206- Long key = peekedIds .remove ();
199+ protected void preProcessRemovedKey (Long key ) {
207200 boolean isExtraPeekedId = extraPeekedIds .contains (key );
208201 if (!isExtraPeekedId ) {
209- updateHeadKey (key );
210- lastDispatchedKey = key ;
202+ super .preProcessRemovedKey (key );
211203 } else {
212204 extraPeekedIdsRemovedButPreviousIdNotRemoved .add (key );
213205 }
214- removeIndex (key );
215- // Remove the entry at that key with a callback arg signifying it is
216- // a WAN queue so that AbstractRegionEntry.destroy can get the value
217- // even if it has been evicted to disk. In the normal case, the
218- // AbstractRegionEntry.destroy only gets the value in the VM.
219- try {
220- this .region .localDestroy (key , WAN_QUEUE_TOKEN );
221- this .stats .decQueueSize ();
222- } catch (EntryNotFoundException ok ) {
223- // this is acceptable because the conflation can remove entries
224- // out from underneath us.
225- if (logger .isDebugEnabled ()) {
226- logger .debug (
227- "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation." ,
228- this , key );
229- }
230- }
206+ }
231207
208+ @ Override
209+ protected void postProcessRemovedKey () {
232210 // For those extraPeekedIds removed that are consecutive to lastDispatchedKey:
233211 // - Update lastDispatchedKey with them so that they are removed
234212 // by the batch removal thread.
@@ -238,20 +216,7 @@ public synchronized void remove() throws CacheException {
238216 while (extraPeekedIdsRemovedButPreviousIdNotRemoved .contains (tmpKey = inc (tmpKey ))) {
239217 extraPeekedIdsRemovedButPreviousIdNotRemoved .remove (tmpKey );
240218 extraPeekedIds .remove (tmpKey );
241- updateHeadKey (tmpKey );
242- lastDispatchedKey = tmpKey ;
243- }
244-
245- if (wasEmpty ) {
246- synchronized (this ) {
247- notifyAll ();
248- }
249- }
250-
251- if (logger .isDebugEnabled ()) {
252- logger .debug (
253- "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}" ,
254- this , key , this .lastDispatchedKey , this .lastDestroyedKey );
219+ super .preProcessRemovedKey (tmpKey );
255220 }
256221 }
257222
0 commit comments