Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,12 @@ public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(GridNe
try {
if (req.reads() != null) {
for (IgniteTxEntry e : req.reads())
addEntry(req.messageId(), e);
addEntry(e);
}

if (req.writes() != null) {
for (IgniteTxEntry e : req.writes())
addEntry(req.messageId(), e);
addEntry(e);
}

userPrepare(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,18 +483,16 @@ private void addMapping(
}

/**
* @param msgId Message ID.
* @param e Entry to add.
* @return Future for active transactions for the time when reader was added.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgniteInternalFuture<Boolean> addEntry(long msgId, IgniteTxEntry e) throws IgniteCheckedException {
public void addEntry(IgniteTxEntry e) throws IgniteCheckedException {
init();

TransactionState state = state();

assert state == PREPARING : "Invalid tx state for " +
"adding entry [msgId=" + msgId + ", e=" + e + ", tx=" + this + ']';
"adding entry [e=" + e + ", tx=" + this + ']';

e.unmarshal(cctx, false, cctx.deploy().globalLoader());

Expand Down Expand Up @@ -544,8 +542,6 @@ private void addMapping(
if (log.isDebugEnabled())
log.debug("Added entry to transaction: " + existing);
}

return addReader(msgId, dhtCache.entryExx(existing.key()), existing, topologyVersion());
}
catch (GridDhtInvalidPartitionException ex) {
throw new IgniteCheckedException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,8 @@ private void map(IgniteTxEntry entry) throws IgniteTxRollbackCheckedException {

GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();

tx.addReader(req.messageId(), cached, entry, tx.topologyVersion());

GridCacheContext cacheCtx = entry.context();

GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
Expand Down Expand Up @@ -153,6 +157,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
/** Handle binary in interceptor operation flag. */
protected boolean keepBinaryInInterceptor;

/** Near entries reserved against eviction for the time of update. */
protected Map<KeyCacheObject, GridNearCacheEntry> reservedEntries;

/**
* Constructor.
*
Expand Down Expand Up @@ -212,6 +219,8 @@ protected GridNearAtomicAbstractUpdateFuture(

nearEnabled = CU.isNearEnabled(cctx);

reservedEntries = nearEnabled ? new GridLeanMap<>() : null;

this.remapCnt = remapCnt;
this.appAttrs = appAttrs;
this.keepBinaryInInterceptor = keepBinaryInInterceptor;
Expand Down Expand Up @@ -404,6 +413,80 @@ final void completeFuture(@Nullable GridCacheReturn ret, Throwable err, @Nullabl
return false;
}

/** {@inheritDoc} */
@Override protected boolean onDone(@Nullable Object res, @Nullable Throwable err, boolean cancel) {
if (super.onDone(res, err, cancel)) {
releaseNearCacheEntries();

return true;
}

return false;
}

/**
* Creates near entries for the request keys and reserves them against eviction, so that a concurrent
* DHT update and the reordered response always find the entry and are ordered by version.
* Reservations are released on future completion; must be called before the future is published via
* {@code addAtomicFuture} — a reservation taken after a concurrent completion would never be released.
*
* @param req Mapped update request.
* @param topVer Update topology version.
*/
protected final void reserveNearCacheEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) {
if (!nearEnabled)
return;

for (int i = 0; i < req.size(); i++)
reserveNearCacheEntry(req.key(i), topVer);
}

/** */
private void reserveNearCacheEntry(KeyCacheObject key, AffinityTopologyVersion topVer) {
if (cctx.affinityNode() && cctx.affinity().keyLocalNode(key, topVer))
return;

GridNearAtomicCache nearCache = (GridNearAtomicCache)cctx.dht().near();

synchronized (this) {
if (reservedEntries.containsKey(key))
return;

while (true) {
try {
GridNearCacheEntry entry = nearCache.entryExx(key, topVer);

entry.reserveEviction();

reservedEntries.put(key, entry);

return;
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry while reserving near cache entry (will retry): " + key);
}
}
}
}

/** */
private void releaseNearCacheEntries() {
Map<KeyCacheObject, GridNearCacheEntry> reservedEntries0;

synchronized (this) {
if (reservedEntries == null || reservedEntries.isEmpty())
return;

reservedEntries0 = reservedEntries;

reservedEntries = null;
}

for (GridNearCacheEntry entry : reservedEntries0.values())
entry.releaseEviction();
}

/**
* @param req Request.
* @param res Response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicU
try {
reqState0 = mapSingleUpdate(topVer, futId);

reserveNearCacheEntries(reqState0.req, topVer);

synchronized (this) {
assert topVer.topologyVersion() > 0 : topVer;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,13 @@ private void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheOb
}
}

if (singleReq0 != null)
reserveNearCacheEntries(singleReq0.req, topVer);
else {
for (PrimaryRequestState reqState : mappings0.values())
reserveNearCacheEntries(reqState.req, topVer);
}

synchronized (this) {
assert topVer.topologyVersion() > 0 : topVer;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ public boolean loadedValue(@Nullable IgniteInternalTx tx,
/**
* @throws GridCacheEntryRemovedException If entry was removed.
*/
void reserveEviction() throws GridCacheEntryRemovedException {
public void reserveEviction() throws GridCacheEntryRemovedException {
lockEntry();

try {
Expand All @@ -737,7 +737,7 @@ void reserveEviction() throws GridCacheEntryRemovedException {
/**
*
*/
void releaseEviction() {
public void releaseEviction() {
lockEntry();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ private void runTestGetUpdateMultithreaded(CacheConfiguration<Integer, Integer>
final List<Ignite> getNodes,
final TransactionConcurrency concurrency,
final TransactionIsolation isolation) throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-627");

log.info("Execute updates [concurrency=" + concurrency + ", isolation=" + isolation + ']');

final Ignite ignite0 = ignite(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.junit.Assume;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
Expand Down Expand Up @@ -226,8 +225,6 @@ public void testPutRemoveAll() throws Exception {
*/
@Test
public void testPutConsistencyMultithreaded() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());

for (int i = 0; i < 20; i++) {
log.info("Iteration: " + i);

Expand Down Expand Up @@ -279,8 +276,6 @@ public void testPutConsistencyMultithreaded() throws Exception {
*/
@Test
public void testPutRemoveConsistencyMultithreaded() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());

for (int i = 0; i < SF.applyLB(10, 2); i++) {
log.info("Iteration: " + i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
Expand All @@ -33,6 +34,7 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
Expand Down Expand Up @@ -864,6 +866,82 @@ private void readerUpdateDhtFails(boolean updateNearEnabled,
checkData(map);
}

/**
* @throws Exception If failed.
*/
@Test
public void testNearEntryUpdateRacePut() throws Exception {
nearEntryUpdateRace(cache -> cache.put(0, 1), F.asMap(0, 2));
}

/**
* @throws Exception If failed.
*/
@Test
public void testNearEntryUpdateRacePutIfAbsent() throws Exception {
nearEntryUpdateRace(cache -> assertTrue(cache.putIfAbsent(0, 1)), F.asMap(0, 2));
}

/**
* @throws Exception If failed.
*/
@Test
public void testNearEntryUpdateRaceInvoke() throws Exception {
nearEntryUpdateRace(cache -> cache.invoke(0, new SetValueEntryProcessor(1)), F.asMap(0, 2));
}

/**
* @throws Exception If failed.
*/
@Test
public void testNearEntryUpdateRacePutAll() throws Exception {
Map<Integer, Integer> initVals = new HashMap<>();
Map<Integer, Integer> newVals = new HashMap<>();

for (int i = 0; i < 100; i++) {
initVals.put(i, i);
newVals.put(i, i + 10_000);
}

nearEntryUpdateRace(cache -> cache.putAll(initVals), newVals);
}

/**
* @param nearOp Near cache update.
* @param newVals Concurrent update from the primary that must win.
* @throws Exception If failed.
*/
private void nearEntryUpdateRace(Consumer<IgniteCache<Integer, Integer>> nearOp, Map<Integer, Integer> newVals)
throws Exception {
ccfg = cacheConfiguration(1, FULL_SYNC);

Ignite srv0 = startGrid(0);

IgniteCache<Integer, Integer> srvCache = srv0.cache(TEST_CACHE);

ccfg = null;

Ignite client1 = startClientGrid(1);

IgniteCache<Integer, Integer> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());

testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name());

IgniteInternalFuture<?> nearOpFut = GridTestUtils.runAsync(() -> nearOp.accept(nearCache));

testSpi(srv0).waitForBlocked();

srvCache.putAll(newVals);

assertFalse(nearOpFut.isDone());

testSpi(srv0).stopBlock();

nearOpFut.get();

checkData(newVals);
}

/**
* @param expData Expected cache data.
*/
Expand Down
Loading