ignite-627
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 983b094..1fd3385 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -37,16 +37,25 @@
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,7 +65,10 @@
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 
 /**
  * Base for near atomic update futures.
@@ -143,6 +155,9 @@
     /** Operation result. */
     protected GridCacheReturn opRes;
 
+    /** */
+    protected Map<KeyCacheObject, GridNearCacheEntry> reservedEntries;
+
     /**
      * Constructor.
      *
@@ -243,29 +258,40 @@
      * Performs future mapping.
      */
     public final void map() {
+        map(false);
+    }
+
+    /**
+     * Performs future mapping.
+     *
+     * @param remap Remap flag.
+     */
+    protected final void map(boolean remap) {
         AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
 
         if (topVer == null)
-            mapOnTopology();
+            mapOnTopology(remap);
         else {
             topLocked = true;
 
             // Cannot remap.
             remapCnt = 1;
 
-            map(topVer);
+            map(topVer, remap);
         }
     }
 
     /**
      * @param topVer Topology version.
+     * @param remap Remap flag.
      */
-    protected abstract void map(AffinityTopologyVersion topVer);
+    protected abstract void map(AffinityTopologyVersion topVer, boolean remap);
 
     /**
      * Maps future on ready topology.
+     * @param remap Remap flag.
      */
-    protected abstract void mapOnTopology();
+    protected abstract void mapOnTopology(boolean remap);
 
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
@@ -358,7 +384,8 @@
         if (futId != null)
             cctx.mvcc().removeAtomicFuture(futId);
 
-        super.onDone(retval, err);
+        if (super.onDone(retval, err) && nearEnabled)
+            releaseNearCacheEntries();
     }
 
     /** {@inheritDoc} */
@@ -380,6 +407,9 @@
             if (futId != null)
                 cctx.mvcc().removeAtomicFuture(futId);
 
+            if (nearEnabled)
+                releaseNearCacheEntries();
+
             return true;
         }
 
@@ -387,6 +417,25 @@
     }
 
     /**
+     *
+     */
+    private void releaseNearCacheEntries() {
+        Map<KeyCacheObject, GridNearCacheEntry> reservedEntries0;
+
+        synchronized (this) {
+            if (reservedEntries == null|| reservedEntries.isEmpty())
+                return;
+
+            reservedEntries0 = reservedEntries;
+
+            reservedEntries = null;
+        }
+
+        for (GridNearCacheEntry entry : reservedEntries0.values())
+            entry.releaseEviction();
+    }
+
+    /**
      * @param req Request.
      * @param res Response.
      */
@@ -507,6 +556,240 @@
     }
 
     /**
+     * @return Near cache.
+     */
+    protected final GridNearAtomicCache nearCache() {
+        return (GridNearAtomicCache)cctx.dht().near();
+    }
+
+    /**
+     * @param key Key,
+     * @param topVer Update topology version.
+     */
+    protected final void reserveNearCacheEntry(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        assert nearEnabled;
+        assert reservedEntries != null;
+
+        if (cctx.affinityNode() && cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), topVer))
+            return;
+
+        GridNearAtomicCache nearCache = nearCache();
+
+        synchronized (this) {
+            if (reservedEntries.containsKey(key))
+                return;
+
+            while (true) {
+                try {
+                    GridNearCacheEntry entry =  nearCache.entryExx(key, topVer);
+
+                    entry.reserveEviction();
+
+                    reservedEntries.put(key, entry);
+
+                    return;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry while reserving near cache entry (will retry): " + key);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param req Update request.
+     * @param res Update response.
+     */
+    protected final void processNearAtomicUpdateResponse(
+            GridNearAtomicAbstractUpdateRequest req,
+            GridNearAtomicUpdateResponse res
+    ) {
+        if (F.size(res.failedKeys()) == req.size())
+            return;
+
+        GridNearAtomicCache nearCache = nearCache();
+
+        /*
+         * Choose value to be stored in near cache: first check key is not in failed and not in skipped list,
+         * then check if value was generated on primary node, if not then use value sent in request.
+         */
+
+        Collection<KeyCacheObject> failed = res.failedKeys();
+        List<Integer> nearValsIdxs = res.nearValuesIndexes();
+        List<Integer> skipped = res.skippedIndexes();
+
+        GridCacheVersion ver = res.nearVersion();
+
+        assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
+
+        int nearValIdx = 0;
+
+        String taskName = cctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+        for (int i = 0; i < req.size(); i++) {
+            if (F.contains(skipped, i))
+                continue;
+
+            KeyCacheObject key = req.key(i);
+
+            if (F.contains(failed, key))
+                continue;
+
+            if (cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
+                GridCacheEntryEx entry = nearCache.peekEx(key);
+
+                if (entry != null && entry.markObsolete(ver))
+                    nearCache.removeEntry(entry);
+
+                continue;
+            }
+
+            CacheObject val = null;
+
+            if (F.contains(nearValsIdxs, i)) {
+                val = res.nearValue(nearValIdx);
+
+                nearValIdx++;
+            }
+            else {
+                assert req.operation() != TRANSFORM;
+
+                if (req.operation() != DELETE)
+                    val = req.value(i);
+            }
+
+            long ttl = res.nearTtl(i);
+            long expireTime = res.nearExpireTime(i);
+
+            if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
+                expireTime = CU.toExpireTime(ttl);
+
+            try {
+                processNearAtomicUpdateResponse(
+                    nearCache,
+                    topVer,
+                    ver,
+                    key,
+                    val,
+                    ttl,
+                    expireTime,
+                    req.keepBinary(),
+                    req.nodeId(),
+                    req.subjectId(),
+                    taskName,
+                    req.operation() == TRANSFORM);
+            }
+            catch (IgniteCheckedException e) {
+                res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
+            }
+        }
+    }
+
+    /**
+     * @param nearCache Near cache.
+     * @param topVer Update topology version.
+     * @param ver Version.
+     * @param key Key.
+     * @param val Value.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     * @param keepBinary Keep binary flag.
+     * @param nodeId Node ID.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param transformedValue {@code True} if transformed value.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processNearAtomicUpdateResponse(
+        GridNearAtomicCache nearCache,
+        AffinityTopologyVersion topVer,
+        GridCacheVersion ver,
+        KeyCacheObject key,
+        @Nullable CacheObject val,
+        long ttl,
+        long expireTime,
+        boolean keepBinary,
+        UUID nodeId,
+        UUID subjId,
+        String taskName,
+        boolean transformedValue) throws IgniteCheckedException {
+        try {
+            while (true) {
+                GridNearCacheEntry entry = null;
+
+                try {
+                    entry = nearCache.entryExx(key, topVer);
+
+                    GridCacheOperation op = val != null ? UPDATE : DELETE;
+
+                    GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                            ver,
+                            nodeId,
+                            nodeId,
+                            op,
+                            val,
+                            null,
+                            /*write-through*/false,
+                            /*read-through*/false,
+                            /*retval*/false,
+                            keepBinary,
+                            /*expiry policy*/null,
+                            /*event*/true,
+                            /*metrics*/true,
+                            /*primary*/false,
+                            /*check version*/true,
+                            topVer,
+                            CU.empty0(),
+                            DR_NONE,
+                            ttl,
+                            expireTime,
+                            null,
+                            false,
+                            false,
+                            subjId,
+                            taskName,
+                            null,
+                            null,
+                            null,
+                            transformedValue);
+
+                    boolean release;
+
+                    synchronized (this) {
+                        GridNearCacheEntry reserved = reservedEntries.remove(key);
+
+                        assert reserved == null || reserved == entry;
+
+                        release = reserved != null;
+                    }
+
+                    if (release)
+                        entry.releaseEviction();
+
+                    if (updRes.removeVersion() != null)
+                        nearCache.context().onDeferredDelete(entry, updRes.removeVersion());
+
+                    break; // While.
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry while updating near cache value (will retry): " + key);
+
+                    entry = null;
+                }
+                finally {
+                    if (entry != null)
+                        entry.touch();
+                }
+            }
+        }
+        catch (GridDhtInvalidPartitionException ignored) {
+            // Ignore.
+        }
+    }
+
+    /**
      *
      */
     static class NodeResult {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 264dbe7..1ee4686 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -40,7 +40,7 @@
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -125,6 +125,8 @@
 
         this.key = key;
         this.val = val;
+
+        reservedEntries = new GridLeanMap<>(1);
     }
 
     /** {@inheritDoc} */
@@ -375,7 +377,7 @@
             @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
-                        mapOnTopology();
+                        mapOnTopology(true);
                     }
                 });
             }
@@ -394,13 +396,11 @@
         if (res.remapTopologyVersion() != null)
             return;
 
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
+        processNearAtomicUpdateResponse(req, res);
     }
 
     /** {@inheritDoc} */
-    @Override protected void mapOnTopology() {
+    @Override protected void mapOnTopology(boolean remap) {
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
@@ -434,7 +434,7 @@
                 @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                     cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                         @Override public void run() {
-                            mapOnTopology();
+                            mapOnTopology(remap);
                         }
                     });
                 }
@@ -443,11 +443,11 @@
             return;
         }
 
-        map(topVer);
+        map(topVer, remap);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
+    @Override protected void map(AffinityTopologyVersion topVer, boolean remap) {
         long futId = cctx.mvcc().nextAtomicId();
 
         Exception err = null;
@@ -482,6 +482,9 @@
             return;
         }
 
+        if (!remap && nearEnabled)
+            reserveNearCacheEntry(reqState0.req.key(0), topVer);
+
         // Optimize mapping for single key.
         sendSingleRequest(reqState0.req.nodeId(), reqState0.req);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3835d6a..8298574 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -44,7 +44,6 @@
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -160,6 +159,8 @@
         this.vals = vals;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
+
+        reservedEntries = nearEnabled ? U.newHashMap(keys.size()) : null;
     }
 
     /** {@inheritDoc} */
@@ -470,6 +471,9 @@
             completeFuture(opRes0, err0, res.futureId());
     }
 
+    /**
+     * @param remapTopVer New topology version.
+     */
     private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
         assert remapTopVer != null;
 
@@ -500,7 +504,7 @@
             @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
-                        mapOnTopology();
+                        mapOnTopology(true);
                     }
                 });
             }
@@ -614,13 +618,11 @@
         if (res.remapTopologyVersion() != null)
             return;
 
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
+        processNearAtomicUpdateResponse(req, res);
     }
 
     /** {@inheritDoc} */
-    @Override protected void mapOnTopology() {
+    @Override protected void mapOnTopology(boolean remap) {
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
@@ -654,7 +656,7 @@
                 @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                     cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                         @Override public void run() {
-                            mapOnTopology();
+                            mapOnTopology(remap);
                         }
                     });
                 }
@@ -663,7 +665,7 @@
             return;
         }
 
-        map(topVer, remapKeys);
+        map(topVer, remap, remapKeys);
     }
 
     /**
@@ -727,15 +729,16 @@
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
-        map(topVer, null);
+    @Override protected void map(AffinityTopologyVersion topVer, boolean remap) {
+        map(topVer, remap, null);
     }
 
     /**
      * @param topVer Topology version.
+     * @param remap Remap flag.
      * @param remapKeys Keys to remap.
      */
-    private void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+    private void map(AffinityTopologyVersion topVer, boolean remap, @Nullable Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -760,12 +763,14 @@
             if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
+                singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown, remap);
             }
             else {
-                Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
+                Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(
+                    topNodes,
                     topVer,
                     futId,
+                    remap,
                     remapKeys,
                     mappingKnown);
 
@@ -913,13 +918,16 @@
      * @param topNodes Cache nodes.
      * @param topVer Topology version.
      * @param futId Future ID.
+     * @param remap Remap flag.
      * @param remapKeys Keys to remap.
      * @return Mapping.
      * @throws Exception If failed.
      */
-    private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
+    private Map<UUID, PrimaryRequestState> mapUpdate(
+        Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         Long futId,
+        boolean remap,
         @Nullable Collection<KeyCacheObject> remapKeys,
         boolean mappingKnown) throws Exception {
         Iterator<?> it = null;
@@ -1045,6 +1053,9 @@
                 mapped.addMapping(nodes);
 
             mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
+
+            if (!remap && nearEnabled)
+                reserveNearCacheEntry(cacheKey, topVer);
         }
 
         return pendingMappings;
@@ -1054,10 +1065,15 @@
      * @param topVer Topology version.
      * @param futId Future ID.
      * @param mappingKnown {@code True} if update mapping is known locally.
+     * @param remap Remap flag.
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+    private PrimaryRequestState mapSingleUpdate(
+        AffinityTopologyVersion topVer,
+        Long futId,
+        boolean mappingKnown,
+        boolean remap)
         throws Exception {
         Object key = F.first(keys);
 
@@ -1156,6 +1172,9 @@
             conflictExpireTime,
             conflictVer);
 
+        if (!remap && nearEnabled)
+            reserveNearCacheEntry(cacheKey, topVer);
+
         return new PrimaryRequestState(req, nodes, true);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 211ae12..4ce55fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -17,17 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -41,12 +30,12 @@
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -61,6 +50,18 @@
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index a52e23e..bf52d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -715,7 +715,7 @@
     /**
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    void reserveEviction() throws GridCacheEntryRemovedException {
+    public void reserveEviction() throws GridCacheEntryRemovedException {
         lockEntry();
 
         try {
@@ -731,7 +731,7 @@
     /**
      *
      */
-    void releaseEviction() {
+    public void releaseEviction() {
         lockEntry();
 
         try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
index c4f4d9c..4ea8819 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
@@ -238,8 +238,6 @@
      */
     @Test
     public void testPutConsistencyMultithreaded() throws Exception {
-        Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());
-
         for (int i = 0; i < 20; i++) {
             log.info("Iteration: " + i);
 
@@ -291,8 +289,6 @@
      */
     @Test
     public void testPutRemoveConsistencyMultithreaded() throws Exception {
-        Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());
-
        for (int i = 0; i < SF.applyLB(10, 2); i++) {
            log.info("Iteration: " + i);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 34809b6..a1bc173 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -33,6 +33,7 @@
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
@@ -901,6 +902,140 @@
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNearEntryUpdateRace_Put() throws Exception {
+        nearEntryUpdateRace("put");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearEntryUpdateRace_PutIfAbsent() throws Exception {
+        nearEntryUpdateRace("putIfAbsent");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearEntryUpdateRace_Invoke() throws Exception {
+        nearEntryUpdateRace("invoke");
+    }
+
+    /**
+     * @param cacheOp Cache operation.
+     * @throws Exception If failed.
+     */
+    private void nearEntryUpdateRace(String cacheOp) throws Exception {
+        ccfg = cacheConfiguration(1, FULL_SYNC);
+
+        client = false;
+
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, Integer> srvCache = srv0.cache(TEST_CACHE);
+
+        int key = 0;
+
+        ccfg = null;
+
+        client = true;
+
+        Ignite client1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+        testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name());
+
+        IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                switch (cacheOp) {
+                    case "put":
+                        nearCache.put(key, 1);
+                        break;
+
+                    case "putIfAbsent":
+                        assertTrue(nearCache.putIfAbsent(key, 1));
+                        break;
+
+                    case "invoke":
+                        nearCache.invoke(key, new SetValueEntryProcessor(1));
+                        break;
+
+                    default:
+                        fail("Invalid operation: " + cacheOp);
+                }
+            }
+        });
+
+        testSpi(srv0).waitForBlocked();
+
+        srvCache.put(key, 2);
+
+        assertFalse(nearPutFut.isDone());
+
+        testSpi(srv0).stopBlock();
+
+        nearPutFut.get();
+
+        assertEquals((Integer)2, nearCache.get(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearEntryUpdateRace_PutAll() throws Exception {
+        ccfg = cacheConfiguration(1, FULL_SYNC);
+
+        client = false;
+
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, Integer> srvCache = srv0.cache(TEST_CACHE);
+
+        final int keys = 100;
+
+        ccfg = null;
+
+        client = true;
+
+        Ignite client1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+        testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name());
+
+        IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                Map<Integer, Integer> map = new HashMap<>();
+
+                for (int i = 0; i < keys; i++)
+                    map.put(i, i);
+
+                nearCache.putAll(map);
+            }
+        });
+
+        testSpi(srv0).waitForBlocked();
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        for (int i = 0; i < keys; i++)
+            map.put(i, i + 10_000);
+
+        srvCache.putAll(map);
+
+        assertFalse(nearPutFut.isDone());
+
+        testSpi(srv0).stopBlock();
+
+        nearPutFut.get();
+
+        for (int i = 0; i < keys; i++)
+            assertEquals((Integer)(i + 10_000), nearCache.get(i));
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {