From 58fd90904e24beb1d0ebd494a14fe084071f3566 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Thu, 2 Jul 2026 16:50:02 +0300 Subject: [PATCH] IGNITE-627 Fixed inconsistent value in near cache on concurrent update Co-Authored-By: Claude Fable 5 --- .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../dht/GridDhtTxLocalAdapter.java | 8 +- .../dht/GridDhtTxPrepareFuture.java | 2 + .../GridNearAtomicAbstractUpdateFuture.java | 103 ++++++++++++++++ .../GridNearAtomicSingleUpdateFuture.java | 4 +- .../atomic/GridNearAtomicUpdateFuture.java | 9 +- .../distributed/near/GridNearAtomicCache.java | 20 +++- .../distributed/near/GridNearCacheEntry.java | 4 +- .../cache/CacheNearReaderUpdateTest.java | 2 - ...CacheValueConsistencyAbstractSelfTest.java | 5 - ...iteCacheP2pUnmarshallingNearErrorTest.java | 2 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 112 ++++++++++++++++++ 12 files changed, 251 insertions(+), 24 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 67b32d38f86b9..9e3ecb4836346 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -371,12 +371,12 @@ public final IgniteInternalFuture prepareAsync(GridNe try { if (req.reads() != null) { for (IgniteTxEntry e : req.reads()) - addEntry(req.messageId(), e); + addEntry(e); } if (req.writes() != null) { for (IgniteTxEntry e : req.writes()) - addEntry(req.messageId(), e); + addEntry(e); } userPrepare(null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 4c7e9313f5e16..d7ef37458a9bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -483,18 +483,16 @@ private void addMapping( } /** - * @param msgId Message ID. * @param e Entry to add. - * @return Future for active transactions for the time when reader was added. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteInternalFuture addEntry(long msgId, IgniteTxEntry e) throws IgniteCheckedException { + public void addEntry(IgniteTxEntry e) throws IgniteCheckedException { init(); TransactionState state = state(); assert state == PREPARING : "Invalid tx state for " + - "adding entry [msgId=" + msgId + ", e=" + e + ", tx=" + this + ']'; + "adding entry [e=" + e + ", tx=" + this + ']'; e.unmarshal(cctx, false, cctx.deploy().globalLoader()); @@ -544,8 +542,6 @@ private void addMapping( if (log.isDebugEnabled()) log.debug("Added entry to transaction: " + existing); } - - return addReader(msgId, dhtCache.entryExx(existing.key()), existing, topologyVersion()); } catch (GridDhtInvalidPartitionException ex) { throw new IgniteCheckedException(ex); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 7af5629a66818..c1e39c57e7f0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1615,6 +1615,8 @@ private void map(IgniteTxEntry entry) throws IgniteTxRollbackCheckedException { GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); + tx.addReader(req.messageId(), cached, entry, tx.topologyVersion()); + GridCacheContext cacheCtx = entry.context(); GridDhtCacheAdapter dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 21aa24e160865..43b0f5a05d8d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -41,12 +41,16 @@ import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -153,6 +157,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture /** Handle binary in interceptor operation flag. */ protected boolean keepBinaryInInterceptor; + /** Near entries reserved against eviction for the time of update. */ + protected Map reservedEntries; + /** * Constructor. * @@ -212,6 +219,8 @@ protected GridNearAtomicAbstractUpdateFuture( nearEnabled = CU.isNearEnabled(cctx); + reservedEntries = nearEnabled ? new GridLeanMap<>() : null; + this.remapCnt = remapCnt; this.appAttrs = appAttrs; this.keepBinaryInInterceptor = keepBinaryInInterceptor; @@ -404,6 +413,100 @@ final void completeFuture(@Nullable GridCacheReturn ret, Throwable err, @Nullabl return false; } + /** {@inheritDoc} */ + @Override protected boolean onDone(@Nullable Object res, @Nullable Throwable err, boolean cancel) { + if (super.onDone(res, err, cancel)) { + releaseNearCacheEntries(); + + return true; + } + + return false; + } + + /** + * Creates near entries for the request keys and reserves them against eviction, so that a concurrent + * DHT update and the reordered response always find the entry and are ordered by version. + * Reservations are released on future completion; must be called before the future is published via + * {@code addAtomicFuture} — a reservation taken after a concurrent completion would never be released. + * + * @param req Mapped update request. + * @param topVer Update topology version. + */ + protected final void reserveNearCacheEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) { + if (!nearEnabled) + return; + + for (int i = 0; i < req.size(); i++) + reserveNearCacheEntry(req.key(i), topVer); + } + + /** */ + private void reserveNearCacheEntry(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (cctx.affinityNode() && cctx.affinity().keyLocalNode(key, topVer)) + return; + + GridNearAtomicCache nearCache = (GridNearAtomicCache)cctx.dht().near(); + + synchronized (this) { + if (reservedEntries.containsKey(key)) + return; + + while (true) { + try { + GridNearCacheEntry entry = nearCache.entryExx(key, topVer); + + entry.reserveEviction(); + + reservedEntries.put(key, entry); + + return; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while reserving near cache entry (will retry): " + key); + } + } + } + } + + /** + * Releases the reservation for the key once its near update response is applied, so that the entry is + * evictable again by the time response processing offers it to the eviction policy. + * + * @param key Key. + */ + public final void releaseNearCacheEntry(KeyCacheObject key) { + GridNearCacheEntry entry; + + synchronized (this) { + entry = reservedEntries != null ? reservedEntries.remove(key) : null; + } + + if (entry != null) + entry.releaseEviction(); + } + + /** */ + private void releaseNearCacheEntries() { + Map reservedEntries0; + + synchronized (this) { + if (reservedEntries == null || reservedEntries.isEmpty()) + return; + + reservedEntries0 = reservedEntries; + + reservedEntries = null; + } + + for (GridNearCacheEntry entry : reservedEntries0.values()) { + entry.releaseEviction(); + + entry.touch(); + } + } + /** * @param req Request. * @param res Response. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index cdbb251d049af..46f5ddd940896 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -381,7 +381,7 @@ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicU GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - near.processNearAtomicUpdateResponse(req, res); + near.processNearAtomicUpdateResponse(req, res, this); } /** {@inheritDoc} */ @@ -433,6 +433,8 @@ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicU try { reqState0 = mapSingleUpdate(topVer, futId); + reserveNearCacheEntries(reqState0.req, topVer); + synchronized (this) { assert topVer.topologyVersion() > 0 : topVer; assert this.topVer == AffinityTopologyVersion.ZERO : this; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index a0714a324c968..b9692d5dc5910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -609,7 +609,7 @@ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicU GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - near.processNearAtomicUpdateResponse(req, res); + near.processNearAtomicUpdateResponse(req, res, this); } /** {@inheritDoc} */ @@ -762,6 +762,13 @@ private void map(AffinityTopologyVersion topVer, @Nullable Collection 0 : topVer; assert this.topVer == AffinityTopologyVersion.ZERO : this; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 366f937e3801f..04ea56e81ba50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; @@ -131,10 +132,12 @@ public void dht(GridDhtAtomicCache dht) { /** * @param req Update request. * @param res Update response. + * @param fut Update future, releases the key reservation before its entry is offered to eviction. */ public void processNearAtomicUpdateResponse( GridNearAtomicAbstractUpdateRequest req, - GridNearAtomicUpdateResponse res + GridNearAtomicUpdateResponse res, + GridNearAtomicAbstractUpdateFuture fut ) { if (F.size(res.failedKeys()) == req.size()) return; @@ -171,6 +174,8 @@ public void processNearAtomicUpdateResponse( // Reader became backup. if (ctx.affinity().partitionBelongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { + fut.releaseNearCacheEntry(key); + GridCacheEntryEx entry = peekEx(key); if (entry != null && entry.markObsolete(ver)) @@ -209,7 +214,8 @@ public void processNearAtomicUpdateResponse( req.keepBinaryInInterceptor(), req.nodeId(), taskName, - req.operation() == TRANSFORM); + req.operation() == TRANSFORM, + fut); } catch (IgniteCheckedException e) { res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e)); @@ -226,6 +232,7 @@ public void processNearAtomicUpdateResponse( * @param nodeId Node ID. * @param taskName Task name. * @param transformedValue {@code True} if transformed value. + * @param fut Update future. * @throws IgniteCheckedException If failed. */ private void processNearAtomicUpdateResponse( @@ -238,7 +245,8 @@ private void processNearAtomicUpdateResponse( boolean keepBinaryInInterceptor, UUID nodeId, String taskName, - boolean transformedValue) throws IgniteCheckedException { + boolean transformedValue, + GridNearAtomicAbstractUpdateFuture fut) throws IgniteCheckedException { try { while (true) { GridCacheEntryEx entry = null; @@ -294,8 +302,12 @@ private void processNearAtomicUpdateResponse( entry = null; } finally { - if (entry != null) + if (entry != null) { + // Release before touch, so that the eviction policy sees an evictable entry. + fut.releaseNearCacheEntry(key); + entry.touch(); + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 5e98920b60c81..bb47aeb5f5120 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -721,7 +721,7 @@ public boolean loadedValue(@Nullable IgniteInternalTx tx, /** * @throws GridCacheEntryRemovedException If entry was removed. */ - void reserveEviction() throws GridCacheEntryRemovedException { + public void reserveEviction() throws GridCacheEntryRemovedException { lockEntry(); try { @@ -737,7 +737,7 @@ void reserveEviction() throws GridCacheEntryRemovedException { /** * */ - void releaseEviction() { + public void releaseEviction() { lockEntry(); try { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java index d9165f87d137f..09a740d6bdc6e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java @@ -162,8 +162,6 @@ private void runTestGetUpdateMultithreaded(CacheConfiguration final List getNodes, final TransactionConcurrency concurrency, final TransactionIsolation isolation) throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-627"); - log.info("Execute updates [concurrency=" + concurrency + ", isolation=" + isolation + ']'); final Ignite ignite0 = ignite(0); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java index ccf17fc87196c..933afa9bd6427 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java @@ -27,7 +27,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.testframework.GridTestUtils.SF; -import org.junit.Assume; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; @@ -226,8 +225,6 @@ public void testPutRemoveAll() throws Exception { */ @Test public void testPutConsistencyMultithreaded() throws Exception { - Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled()); - for (int i = 0; i < 20; i++) { log.info("Iteration: " + i); @@ -279,8 +276,6 @@ public void testPutConsistencyMultithreaded() throws Exception { */ @Test public void testPutRemoveConsistencyMultithreaded() throws Exception { - Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled()); - for (int i = 0; i < SF.applyLB(10, 2); i++) { log.info("Iteration: " + i); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java index 19b9f0c645854..d71c73b1e7871 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java @@ -47,7 +47,7 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma @Test @Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException { //GridCacheEvictionRequest unmarshalling failed test. - readCnt.set(9); //4 for each put (near cache on client works!). + readCnt.set(11); //5 for each put: the near entry is now pre-created at update mapping (near cache on client works!). jcache(0).put(new TestKey(String.valueOf(++key)), ""); jcache(0).put(new TestKey(String.valueOf(++key)), ""); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 31e5aeae44606..5457e9669bfe5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -23,16 +23,20 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicyFactory; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; @@ -864,6 +868,114 @@ private void readerUpdateDhtFails(boolean updateNearEnabled, checkData(map); } + /** + * @throws Exception If failed. + */ + @Test + public void testNearEntryUpdateRacePut() throws Exception { + nearEntryUpdateRace(cache -> cache.put(0, 1), F.asMap(0, 2)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testNearEntryUpdateRacePutIfAbsent() throws Exception { + nearEntryUpdateRace(cache -> assertTrue(cache.putIfAbsent(0, 1)), F.asMap(0, 2)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testNearEntryUpdateRaceInvoke() throws Exception { + nearEntryUpdateRace(cache -> cache.invoke(0, new SetValueEntryProcessor(1)), F.asMap(0, 2)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testNearEntryUpdateRacePutAll() throws Exception { + Map initVals = new HashMap<>(); + Map newVals = new HashMap<>(); + + for (int i = 0; i < 100; i++) { + initVals.put(i, i); + newVals.put(i, i + 10_000); + } + + nearEntryUpdateRace(cache -> cache.putAll(initVals), newVals); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testNearEvictionPolicyOnUpdates() throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + startGrid(0); + + ccfg = null; + + Ignite client = startClientGrid(1); + + IgniteCache nearCache = client.createNearCache(TEST_CACHE, + new NearCacheConfiguration() + .setNearEvictionPolicyFactory(new FifoEvictionPolicyFactory<>(3, 1, 0))); + + Map vals = new HashMap<>(); + + for (int i = 0; i < 100; i++) + vals.put(i, i); + + nearCache.putAll(vals); + + assertEquals(3, nearCache.localSize(CachePeekMode.NEAR)); + + for (int i = 100; i < 200; i++) + nearCache.put(i, i); + + assertEquals(3, nearCache.localSize(CachePeekMode.NEAR)); + } + + /** + * @param nearOp Near cache update. + * @param newVals Concurrent update from the primary that must win. + * @throws Exception If failed. + */ + private void nearEntryUpdateRace(Consumer> nearOp, Map newVals) + throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + Ignite srv0 = startGrid(0); + + IgniteCache srvCache = srv0.cache(TEST_CACHE); + + ccfg = null; + + Ignite client1 = startClientGrid(1); + + IgniteCache nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>()); + + testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name()); + + IgniteInternalFuture nearOpFut = GridTestUtils.runAsync(() -> nearOp.accept(nearCache)); + + testSpi(srv0).waitForBlocked(); + + srvCache.putAll(newVals); + + assertFalse(nearOpFut.isDone()); + + testSpi(srv0).stopBlock(); + + nearOpFut.get(); + + checkData(newVals); + } + /** * @param expData Expected cache data. */