ignite-627
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 983b094..1fd3385 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -37,16 +37,25 @@
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,7 +65,10 @@
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
* Base for near atomic update futures.
@@ -143,6 +155,9 @@
/** Operation result. */
protected GridCacheReturn opRes;
+ /** */
+ protected Map<KeyCacheObject, GridNearCacheEntry> reservedEntries;
+
/**
* Constructor.
*
@@ -243,29 +258,40 @@
* Performs future mapping.
*/
public final void map() {
+ map(false);
+ }
+
+ /**
+ * Performs future mapping.
+ *
+ * @param remap Remap flag.
+ */
+ protected final void map(boolean remap) {
AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
if (topVer == null)
- mapOnTopology();
+ mapOnTopology(remap);
else {
topLocked = true;
// Cannot remap.
remapCnt = 1;
- map(topVer);
+ map(topVer, remap);
}
}
/**
* @param topVer Topology version.
+ * @param remap Remap flag.
*/
- protected abstract void map(AffinityTopologyVersion topVer);
+ protected abstract void map(AffinityTopologyVersion topVer, boolean remap);
/**
* Maps future on ready topology.
+ * @param remap Remap flag.
*/
- protected abstract void mapOnTopology();
+ protected abstract void mapOnTopology(boolean remap);
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
@@ -358,7 +384,8 @@
if (futId != null)
cctx.mvcc().removeAtomicFuture(futId);
- super.onDone(retval, err);
+ if (super.onDone(retval, err) && nearEnabled)
+ releaseNearCacheEntries();
}
/** {@inheritDoc} */
@@ -380,6 +407,9 @@
if (futId != null)
cctx.mvcc().removeAtomicFuture(futId);
+ if (nearEnabled)
+ releaseNearCacheEntries();
+
return true;
}
@@ -387,6 +417,25 @@
}
/**
+ *
+ */
+ private void releaseNearCacheEntries() {
+ Map<KeyCacheObject, GridNearCacheEntry> reservedEntries0;
+
+ synchronized (this) {
+ if (reservedEntries == null|| reservedEntries.isEmpty())
+ return;
+
+ reservedEntries0 = reservedEntries;
+
+ reservedEntries = null;
+ }
+
+ for (GridNearCacheEntry entry : reservedEntries0.values())
+ entry.releaseEviction();
+ }
+
+ /**
* @param req Request.
* @param res Response.
*/
@@ -507,6 +556,240 @@
}
/**
+ * @return Near cache.
+ */
+ protected final GridNearAtomicCache nearCache() {
+ return (GridNearAtomicCache)cctx.dht().near();
+ }
+
+ /**
+ * @param key Key,
+ * @param topVer Update topology version.
+ */
+ protected final void reserveNearCacheEntry(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ assert nearEnabled;
+ assert reservedEntries != null;
+
+ if (cctx.affinityNode() && cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), topVer))
+ return;
+
+ GridNearAtomicCache nearCache = nearCache();
+
+ synchronized (this) {
+ if (reservedEntries.containsKey(key))
+ return;
+
+ while (true) {
+ try {
+ GridNearCacheEntry entry = nearCache.entryExx(key, topVer);
+
+ entry.reserveEviction();
+
+ reservedEntries.put(key, entry);
+
+ return;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while reserving near cache entry (will retry): " + key);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param req Update request.
+ * @param res Update response.
+ */
+ protected final void processNearAtomicUpdateResponse(
+ GridNearAtomicAbstractUpdateRequest req,
+ GridNearAtomicUpdateResponse res
+ ) {
+ if (F.size(res.failedKeys()) == req.size())
+ return;
+
+ GridNearAtomicCache nearCache = nearCache();
+
+ /*
+ * Choose value to be stored in near cache: first check key is not in failed and not in skipped list,
+ * then check if value was generated on primary node, if not then use value sent in request.
+ */
+
+ Collection<KeyCacheObject> failed = res.failedKeys();
+ List<Integer> nearValsIdxs = res.nearValuesIndexes();
+ List<Integer> skipped = res.skippedIndexes();
+
+ GridCacheVersion ver = res.nearVersion();
+
+ assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
+
+ int nearValIdx = 0;
+
+ String taskName = cctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+ for (int i = 0; i < req.size(); i++) {
+ if (F.contains(skipped, i))
+ continue;
+
+ KeyCacheObject key = req.key(i);
+
+ if (F.contains(failed, key))
+ continue;
+
+ if (cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
+ GridCacheEntryEx entry = nearCache.peekEx(key);
+
+ if (entry != null && entry.markObsolete(ver))
+ nearCache.removeEntry(entry);
+
+ continue;
+ }
+
+ CacheObject val = null;
+
+ if (F.contains(nearValsIdxs, i)) {
+ val = res.nearValue(nearValIdx);
+
+ nearValIdx++;
+ }
+ else {
+ assert req.operation() != TRANSFORM;
+
+ if (req.operation() != DELETE)
+ val = req.value(i);
+ }
+
+ long ttl = res.nearTtl(i);
+ long expireTime = res.nearExpireTime(i);
+
+ if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
+ expireTime = CU.toExpireTime(ttl);
+
+ try {
+ processNearAtomicUpdateResponse(
+ nearCache,
+ topVer,
+ ver,
+ key,
+ val,
+ ttl,
+ expireTime,
+ req.keepBinary(),
+ req.nodeId(),
+ req.subjectId(),
+ taskName,
+ req.operation() == TRANSFORM);
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
+ }
+ }
+ }
+
+ /**
+ * @param nearCache Near cache.
+ * @param topVer Update topology version.
+ * @param ver Version.
+ * @param key Key.
+ * @param val Value.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
+ * @param keepBinary Keep binary flag.
+ * @param nodeId Node ID.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param transformedValue {@code True} if transformed value.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processNearAtomicUpdateResponse(
+ GridNearAtomicCache nearCache,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion ver,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ long ttl,
+ long expireTime,
+ boolean keepBinary,
+ UUID nodeId,
+ UUID subjId,
+ String taskName,
+ boolean transformedValue) throws IgniteCheckedException {
+ try {
+ while (true) {
+ GridNearCacheEntry entry = null;
+
+ try {
+ entry = nearCache.entryExx(key, topVer);
+
+ GridCacheOperation op = val != null ? UPDATE : DELETE;
+
+ GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ ver,
+ nodeId,
+ nodeId,
+ op,
+ val,
+ null,
+ /*write-through*/false,
+ /*read-through*/false,
+ /*retval*/false,
+ keepBinary,
+ /*expiry policy*/null,
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/false,
+ /*check version*/true,
+ topVer,
+ CU.empty0(),
+ DR_NONE,
+ ttl,
+ expireTime,
+ null,
+ false,
+ false,
+ subjId,
+ taskName,
+ null,
+ null,
+ null,
+ transformedValue);
+
+ boolean release;
+
+ synchronized (this) {
+ GridNearCacheEntry reserved = reservedEntries.remove(key);
+
+ assert reserved == null || reserved == entry;
+
+ release = reserved != null;
+ }
+
+ if (release)
+ entry.releaseEviction();
+
+ if (updRes.removeVersion() != null)
+ nearCache.context().onDeferredDelete(entry, updRes.removeVersion());
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while updating near cache value (will retry): " + key);
+
+ entry = null;
+ }
+ finally {
+ if (entry != null)
+ entry.touch();
+ }
+ }
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ // Ignore.
+ }
+ }
+
+ /**
*
*/
static class NodeResult {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 264dbe7..1ee4686 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -40,7 +40,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -125,6 +125,8 @@
this.key = key;
this.val = val;
+
+ reservedEntries = new GridLeanMap<>(1);
}
/** {@inheritDoc} */
@@ -375,7 +377,7 @@
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(true);
}
});
}
@@ -394,13 +396,11 @@
if (res.remapTopologyVersion() != null)
return;
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
+ processNearAtomicUpdateResponse(req, res);
}
/** {@inheritDoc} */
- @Override protected void mapOnTopology() {
+ @Override protected void mapOnTopology(boolean remap) {
AffinityTopologyVersion topVer;
if (cache.topology().stopping()) {
@@ -434,7 +434,7 @@
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -443,11 +443,11 @@
return;
}
- map(topVer);
+ map(topVer, remap);
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer) {
+ @Override protected void map(AffinityTopologyVersion topVer, boolean remap) {
long futId = cctx.mvcc().nextAtomicId();
Exception err = null;
@@ -482,6 +482,9 @@
return;
}
+ if (!remap && nearEnabled)
+ reserveNearCacheEntry(reqState0.req.key(0), topVer);
+
// Optimize mapping for single key.
sendSingleRequest(reqState0.req.nodeId(), reqState0.req);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3835d6a..8298574 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -44,7 +44,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -160,6 +159,8 @@
this.vals = vals;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
+
+ reservedEntries = nearEnabled ? U.newHashMap(keys.size()) : null;
}
/** {@inheritDoc} */
@@ -470,6 +471,9 @@
completeFuture(opRes0, err0, res.futureId());
}
+ /**
+ * @param remapTopVer New topology version.
+ */
private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
assert remapTopVer != null;
@@ -500,7 +504,7 @@
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(true);
}
});
}
@@ -614,13 +618,11 @@
if (res.remapTopologyVersion() != null)
return;
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
+ processNearAtomicUpdateResponse(req, res);
}
/** {@inheritDoc} */
- @Override protected void mapOnTopology() {
+ @Override protected void mapOnTopology(boolean remap) {
AffinityTopologyVersion topVer;
if (cache.topology().stopping()) {
@@ -654,7 +656,7 @@
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -663,7 +665,7 @@
return;
}
- map(topVer, remapKeys);
+ map(topVer, remap, remapKeys);
}
/**
@@ -727,15 +729,16 @@
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer) {
- map(topVer, null);
+ @Override protected void map(AffinityTopologyVersion topVer, boolean remap) {
+ map(topVer, remap, null);
}
/**
* @param topVer Topology version.
+ * @param remap Remap flag.
* @param remapKeys Keys to remap.
*/
- private void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ private void map(AffinityTopologyVersion topVer, boolean remap, @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -760,12 +763,14 @@
if (size == 1) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
+ singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown, remap);
}
else {
- Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
+ Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(
+ topNodes,
topVer,
futId,
+ remap,
remapKeys,
mappingKnown);
@@ -913,13 +918,16 @@
* @param topNodes Cache nodes.
* @param topVer Topology version.
* @param futId Future ID.
+ * @param remap Remap flag.
* @param remapKeys Keys to remap.
* @return Mapping.
* @throws Exception If failed.
*/
- private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
+ private Map<UUID, PrimaryRequestState> mapUpdate(
+ Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
Long futId,
+ boolean remap,
@Nullable Collection<KeyCacheObject> remapKeys,
boolean mappingKnown) throws Exception {
Iterator<?> it = null;
@@ -1045,6 +1053,9 @@
mapped.addMapping(nodes);
mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
+
+ if (!remap && nearEnabled)
+ reserveNearCacheEntry(cacheKey, topVer);
}
return pendingMappings;
@@ -1054,10 +1065,15 @@
* @param topVer Topology version.
* @param futId Future ID.
* @param mappingKnown {@code True} if update mapping is known locally.
+ * @param remap Remap flag.
* @return Request.
* @throws Exception If failed.
*/
- private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+ private PrimaryRequestState mapSingleUpdate(
+ AffinityTopologyVersion topVer,
+ Long futId,
+ boolean mappingKnown,
+ boolean remap)
throws Exception {
Object key = F.first(keys);
@@ -1156,6 +1172,9 @@
conflictExpireTime,
conflictVer);
+ if (!remap && nearEnabled)
+ reserveNearCacheEntry(cacheKey, topVer);
+
return new PrimaryRequestState(req, nodes, true);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 211ae12..4ce55fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -17,17 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -41,12 +30,12 @@
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -61,6 +50,18 @@
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index a52e23e..bf52d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -715,7 +715,7 @@
/**
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- void reserveEviction() throws GridCacheEntryRemovedException {
+ public void reserveEviction() throws GridCacheEntryRemovedException {
lockEntry();
try {
@@ -731,7 +731,7 @@
/**
*
*/
- void releaseEviction() {
+ public void releaseEviction() {
lockEntry();
try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
index c4f4d9c..4ea8819 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
@@ -238,8 +238,6 @@
*/
@Test
public void testPutConsistencyMultithreaded() throws Exception {
- Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());
-
for (int i = 0; i < 20; i++) {
log.info("Iteration: " + i);
@@ -291,8 +289,6 @@
*/
@Test
public void testPutRemoveConsistencyMultithreaded() throws Exception {
- Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());
-
for (int i = 0; i < SF.applyLB(10, 2); i++) {
log.info("Iteration: " + i);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 34809b6..a1bc173 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -33,6 +33,7 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
@@ -901,6 +902,140 @@
}
/**
+ * @throws Exception If failed.
+ */
+ public void testNearEntryUpdateRace_Put() throws Exception {
+ nearEntryUpdateRace("put");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearEntryUpdateRace_PutIfAbsent() throws Exception {
+ nearEntryUpdateRace("putIfAbsent");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearEntryUpdateRace_Invoke() throws Exception {
+ nearEntryUpdateRace("invoke");
+ }
+
+ /**
+ * @param cacheOp Cache operation.
+ * @throws Exception If failed.
+ */
+ private void nearEntryUpdateRace(String cacheOp) throws Exception {
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ client = false;
+
+ Ignite srv0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> srvCache = srv0.cache(TEST_CACHE);
+
+ int key = 0;
+
+ ccfg = null;
+
+ client = true;
+
+ Ignite client1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+ testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name());
+
+ IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ switch (cacheOp) {
+ case "put":
+ nearCache.put(key, 1);
+ break;
+
+ case "putIfAbsent":
+ assertTrue(nearCache.putIfAbsent(key, 1));
+ break;
+
+ case "invoke":
+ nearCache.invoke(key, new SetValueEntryProcessor(1));
+ break;
+
+ default:
+ fail("Invalid operation: " + cacheOp);
+ }
+ }
+ });
+
+ testSpi(srv0).waitForBlocked();
+
+ srvCache.put(key, 2);
+
+ assertFalse(nearPutFut.isDone());
+
+ testSpi(srv0).stopBlock();
+
+ nearPutFut.get();
+
+ assertEquals((Integer)2, nearCache.get(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearEntryUpdateRace_PutAll() throws Exception {
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ client = false;
+
+ Ignite srv0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> srvCache = srv0.cache(TEST_CACHE);
+
+ final int keys = 100;
+
+ ccfg = null;
+
+ client = true;
+
+ Ignite client1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+ testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name());
+
+ IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < keys; i++)
+ map.put(i, i);
+
+ nearCache.putAll(map);
+ }
+ });
+
+ testSpi(srv0).waitForBlocked();
+
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < keys; i++)
+ map.put(i, i + 10_000);
+
+ srvCache.putAll(map);
+
+ assertFalse(nearPutFut.isDone());
+
+ testSpi(srv0).stopBlock();
+
+ nearPutFut.get();
+
+ for (int i = 0; i < keys; i++)
+ assertEquals((Integer)(i + 10_000), nearCache.get(i));
+ }
+
+ /**
* @param expData Expected cache data.
*/
private void checkData(Map<Integer, Integer> expData) {