Merge remote-tracking branch 'remotes/community/ignite-1.6.11' into ignite-4154-opt2
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index ec12973..f32aadd 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
-import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -355,29 +354,43 @@
     /**
      * Returns collection of nodes (primary first) for specified partition.
      */
-    public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+    public List<ClusterNode> assignPartition(MessageDigest d,
+        int part,
+        List<ClusterNode> nodes,
+        Map<ClusterNode, byte[]> nodesHash,
+        int backups,
         @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
         if (nodes.size() <= 1)
             return nodes;
 
-        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+        if (d == null)
+            d = digest.get();
 
-        MessageDigest d = digest.get();
+        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
 
-        for (ClusterNode node : nodes) {
-            Object nodeHash = resolveNodeHash(node);
+        try {
+            for (int i = 0; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
 
-            try {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                Object nodeHash = resolveNodeHash(node);
 
-                byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+                byte[] nodeHashBytes = nodesHash.get(node);
 
-                out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
-                out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+                if (nodeHashBytes == null) {
+                    byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+                    nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+                    System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+                    nodesHash.put(node, nodeHashBytes0);
+                }
+
+                U.intToBytes(part, nodeHashBytes, 0);
 
                 d.reset();
 
-                byte[] bytes = d.digest(out.toByteArray());
+                byte[] bytes = d.digest(nodeHashBytes);
 
                 long hash =
                       (bytes[0] & 0xFFL)
@@ -391,9 +404,9 @@
 
                 lst.add(F.t(hash, node));
             }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
 
         Collections.sort(lst, COMPARATOR);
@@ -474,8 +487,18 @@
         Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
             GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
 
+        MessageDigest d = digest.get();
+
+        List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+        Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
         for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+            List<ClusterNode> partAssignment = assignPartition(d,
+                i,
+                nodes,
+                nodesHash,
+                affCtx.backups(),
                 neighborhoodCache);
 
             assignments.add(partAssignment);
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index f65bf52..354ea10 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -2402,7 +2402,7 @@
     /**
      *  Filter that accepts all nodes.
      */
-    public static class IgniteAllNodesPredicate  implements IgnitePredicate<ClusterNode> {
+    public static class IgniteAllNodesPredicate implements IgnitePredicate<ClusterNode> {
         /** */
         private static final long serialVersionUID = 0L;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 345cfb4..640b7cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1621,14 +1621,12 @@
     }
 
     /**
-     * Gets cache remote nodes for cache with given name.
-     *
-     * @param cacheName Cache name.
      * @param topVer Topology version.
-     * @return Collection of cache nodes.
+     * @param node Node.
+     * @return DHT caches started on given node.
      */
-    public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
+    public Collection<String> nodeCaches(AffinityTopologyVersion topVer, ClusterNode node) {
+        return resolveDiscoCache(null, topVer).nodeCaches(node);
     }
 
     /**
@@ -2742,6 +2740,17 @@
             this.nodeMap = nodeMap;
         }
 
+        Collection<String> nodeCaches(ClusterNode node) {
+            List<String> res = new ArrayList<>();
+
+            for (Map.Entry<String, Collection<ClusterNode>> e : affCacheNodes.entrySet()) {
+                if (F.contains(e.getValue(), node))
+                    res.add(e.getKey());
+            }
+
+            return res;
+        }
+
         /**
          * Adds node to map.
          *
@@ -2820,17 +2829,6 @@
         }
 
         /**
-         * Gets all remote nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, rmtCacheNodes.get(cacheName));
-        }
-
-        /**
          * Gets all remote nodes that have at least one cache configured.
          *
          * @param topVer Topology version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
new file mode 100644
index 0000000..45758b0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class AffinityCalculateCache {
+    /** */
+    private final Map<Object, List<List<ClusterNode>>> assignCache = new HashMap<>();
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final DiscoveryEvent discoEvt;
+
+    /** */
+    private Map<Integer, List<List<ClusterNode>>> grpAssign;
+
+    /** */
+    private int calcCnt;
+
+    /** */
+    private long lateAffTime;
+
+    public AffinityCalculateCache(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+        this.topVer = topVer;
+        this.discoEvt = discoEvt;
+    }
+
+    public int calculateCount() {
+        return calcCnt;
+    }
+
+    public void addLateAffinityCalculateTime(long time) {
+        lateAffTime += time;
+    }
+
+    public long lateAffinityCalculateTime() {
+        return lateAffTime;
+    }
+
+    public List<List<ClusterNode>> assignPartitions(AffinityFunction aff,
+        int backups,
+        List<ClusterNode> nodes,
+        List<List<ClusterNode>> prevAssignment,
+        @Nullable Integer affGrp,
+        Object affKey) {
+        if (affGrp != null && grpAssign != null) {
+            List<List<ClusterNode>> calcAssign = grpAssign.get(affGrp);
+
+            if (calcAssign != null) {
+                assert calcAssign.size() == aff.partitions() : calcAssign.size();
+
+                return calcAssign;
+            }
+        }
+
+        calcCnt++;
+
+        AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(nodes,
+            prevAssignment,
+            discoEvt,
+            topVer,
+            backups);
+
+        List<List<ClusterNode>> assign = aff.assignPartitions(ctx);
+
+        List<List<ClusterNode>> assign0 = assignCache.get(affKey);
+
+        if (assign0 != null && assign0.equals(assign))
+            assign = assign0;
+        else
+            assignCache.put(affKey, assign);
+
+        if (affGrp != null) {
+            if (grpAssign == null)
+                grpAssign = new HashMap<>();
+
+            grpAssign.put(affGrp, assign);
+        }
+
+        assert assign.size() == aff.partitions() : assign.size();
+
+        return assign;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
new file mode 100644
index 0000000..c9ebbbc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ *
+ */
+public class AffinityConfiguration {
+    /** */
+    private final AffinityFunction aff;
+
+    /** */
+    private final IgnitePredicate<ClusterNode> nodeFilter;
+
+    /** */
+    private final int backups;
+
+    /**
+     * @param aff
+     * @param nodeFilter
+     * @param backups
+     */
+    public AffinityConfiguration(AffinityFunction aff, IgnitePredicate<ClusterNode> nodeFilter, int backups) {
+        this.aff = aff;
+        this.nodeFilter = nodeFilter;
+        this.backups = backups;
+    }
+
+    public AffinityFunction affinityFunction() {
+        return aff;
+    }
+
+    public IgnitePredicate<ClusterNode> nodeFilter() {
+        return nodeFilter;
+    }
+
+    public int backups() {
+        return backups;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..c77124b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -34,6 +34,7 @@
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
@@ -110,35 +111,35 @@
     /** */
     private final Object similarAffKey;
 
+    /** */
+    private final Integer affGrp;
+
     /**
      * Constructs affinity cached calculations.
      *
      * @param ctx Kernal context.
      * @param cacheName Cache name.
-     * @param aff Affinity function.
-     * @param nodeFilter Node filter.
-     * @param backups Number of backups.
+     * @param affCfg Affinity configuration.
      * @param locCache Local cache flag.
      */
     @SuppressWarnings("unchecked")
     public GridAffinityAssignmentCache(GridKernalContext ctx,
         String cacheName,
-        AffinityFunction aff,
-        IgnitePredicate<ClusterNode> nodeFilter,
-        int backups,
+        AffinityConfiguration affCfg,
         boolean locCache)
     {
         assert ctx != null;
-        assert aff != null;
-        assert nodeFilter != null;
 
         this.ctx = ctx;
-        this.aff = aff;
-        this.nodeFilter = nodeFilter;
         this.cacheName = cacheName;
-        this.backups = backups;
+        this.aff = affCfg.affinityFunction();
+        this.nodeFilter = affCfg.nodeFilter();
+        this.backups = affCfg.backups();
         this.locCache = locCache;
 
+        assert aff != null;
+        assert nodeFilter != null;
+
         cacheId = CU.cacheId(cacheName);
 
         log = ctx.log(GridAffinityAssignmentCache.class);
@@ -148,8 +149,11 @@
         head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
 
         similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
-
         assert similarAffKey != null;
+
+        affGrp = locCache ? null : ctx.cache().context().affinity().equalAffinityGroup(cacheId, affCfg);
+
+        log.info("Initialized cache affinity group [cache=" + cacheName + ", grp=" + affGrp + ']');
     }
 
     /**
@@ -205,6 +209,8 @@
      * @param assignment Assignment.
      */
     public void idealAssignment(List<List<ClusterNode>> assignment) {
+        assert assignment == null || assignment.size() == partsCnt : partsCnt;
+
         this.idealAssignment = assignment;
     }
 
@@ -255,7 +261,9 @@
      * @return Affinity assignments.
      */
     @SuppressWarnings("IfMayBeConditional")
-    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer,
+        DiscoveryEvent discoEvt,
+        @Nullable AffinityCalculateCache cache) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
                 ", discoEvt=" + discoEvt + ']');
@@ -273,20 +281,28 @@
         else
             sorted = Collections.singletonList(ctx.discovery().localNode());
 
-        List<List<ClusterNode>> assignment;
+        List<List<ClusterNode>> assignment = null;
 
         if (prevAssignment != null && discoEvt != null) {
             boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter);
 
             if (!affNode)
                 assignment = prevAssignment;
-            else
-                assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment,
-                    discoEvt, topVer, backups));
         }
-        else
-            assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt,
-                topVer, backups));
+
+        if (assignment == null) {
+            if (cache != null)
+                assignment = cache.assignPartitions(aff, backups, sorted, prevAssignment, affGrp, similarAffKey);
+            else {
+                AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(sorted,
+                    prevAssignment,
+                    discoEvt,
+                    topVer,
+                    backups);
+
+                assignment = aff.assignPartitions(ctx);
+            }
+        }
 
         assert assignment != null;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..9727905 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -30,12 +30,18 @@
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver;
+import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityCalculateCache;
+import org.apache.ignite.internal.processors.affinity.AffinityConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
@@ -97,6 +103,115 @@
     private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
         pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
 
+    /** */
+    private int eqAffGrp;
+
+    /** */
+    private final Map<Integer, EqualAffinityCacheGroup> eqAffCacheGroups = new HashMap<>();
+
+    /**
+     *
+     */
+    private class EqualAffinityCacheGroup {
+        /** */
+        private final Map<Integer, AffinityConfiguration> caches = new HashMap<>();
+
+        private EqualAffinityCacheGroup(Integer cacheId, AffinityConfiguration cfg) {
+            caches.put(cacheId, cfg);
+        }
+
+        void add(Integer cacheId, AffinityConfiguration cfg) {
+            caches.put(cacheId, cfg);
+        }
+
+        /**
+         * @param cfg Affinity configuration.
+         * @return {@code True} if cache configurations have exactly the same affinity configuration.
+         */
+        boolean equalAffinity(AffinityConfiguration cfg) {
+            assert !caches.isEmpty();
+
+            AffinityConfiguration cfg0 = F.firstValue(caches);
+
+            assert cfg0 != null;
+
+            if (cfg0.backups() != cfg.backups())
+                return false;
+
+            if (!cfg0.nodeFilter().equals(cfg.nodeFilter()))
+                return false;
+
+            if (cfg0.affinityFunction().getClass() != cfg.affinityFunction().getClass())
+                return false;
+
+            if (cfg0.affinityFunction() == cfg.affinityFunction())
+                return true;
+
+            if (cfg0.affinityFunction() instanceof RendezvousAffinityFunction) {
+                RendezvousAffinityFunction f1 = (RendezvousAffinityFunction)cfg0.affinityFunction();
+                RendezvousAffinityFunction f2 = (RendezvousAffinityFunction)cfg.affinityFunction();
+
+                if (f1.getHashIdResolver() != f2.getHashIdResolver()) {
+                    if (f1.getHashIdResolver() == null || f2.getHashIdResolver() == null)
+                        return false;
+
+                    boolean eqRslvr = (f1.getHashIdResolver().getClass() == f2.getHashIdResolver().getClass()) &&
+                        (f1.getHashIdResolver().getClass() == AffinityNodeAddressHashResolver.class ||
+                            f1.getHashIdResolver().getClass() == AffinityNodeIdHashResolver.class);
+
+                    if (!eqRslvr)
+                        return false;
+                }
+
+                return f1.partitions() == f2.partitions() &&
+                    f1.isExcludeNeighbors() == f2.isExcludeNeighbors() &&
+                    f1.getBackupFilter() == f2.getBackupFilter() &&
+                    f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter();
+            }
+            else if (cfg0.affinityFunction() instanceof FairAffinityFunction) {
+                FairAffinityFunction f1 = (FairAffinityFunction)cfg0.affinityFunction();
+                FairAffinityFunction f2 = (FairAffinityFunction)cfg.affinityFunction();
+
+                return f1.partitions() == f2.partitions() &&
+                    f1.isExcludeNeighbors() == f2.isExcludeNeighbors() &&
+                    f1.getBackupFilter() == f2.getBackupFilter() &&
+                    f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter();
+            }
+            else
+                return false;
+        }
+    }
+
+    @Nullable public Integer equalAffinityGroup(Integer cacheId, AffinityConfiguration cfg) {
+        if (!(cfg.affinityFunction().getClass() == RendezvousAffinityFunction.class ||
+            cfg.affinityFunction().getClass() == FairAffinityFunction.class))
+            return null;
+
+        synchronized (eqAffCacheGroups) {
+            for (Map.Entry<Integer, EqualAffinityCacheGroup> e : eqAffCacheGroups.entrySet()) {
+                EqualAffinityCacheGroup grp = e.getValue();
+
+                if (grp.caches.containsKey(cacheId)) {
+                    assert grp.equalAffinity(cfg);
+
+                    return e.getKey();
+                }
+
+                if (grp.equalAffinity(cfg)) {
+                    grp.add(cacheId, cfg);
+
+                    return e.getKey();
+                }
+            }
+
+            Integer grp = eqAffGrp++;
+
+            eqAffCacheGroups.put(grp, new EqualAffinityCacheGroup(cacheId, cfg));
+
+            return grp;
+        }
+    }
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -372,6 +487,8 @@
 
         Set<Integer> stoppedCaches = null;
 
+        AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
         for (DynamicCacheChangeRequest req : reqs) {
             if (!(req.clientStartOnly() || req.close()))
                 clientOnly = false;
@@ -394,7 +511,7 @@
                             req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
 
                         if (clientCacheStarted)
-                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign, affCache);
                         else if (!req.clientStartOnly()) {
                             assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
@@ -403,7 +520,8 @@
                             assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
 
                             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent());
+                                fut.discoveryEvent(),
+                                affCache);
 
                             aff.initialize(fut.topologyVersion(), assignment);
                         }
@@ -415,6 +533,21 @@
             else if (req.stop() || req.close()) {
                 cctx.cache().blockGateway(req);
 
+                if (req.stop()) {
+                    synchronized (eqAffCacheGroups) {
+                        for (Iterator<EqualAffinityCacheGroup> it = eqAffCacheGroups.values().iterator(); it.hasNext();) {
+                            EqualAffinityCacheGroup grp = it.next();
+
+                            if (grp.caches.remove(cacheId) != null) {
+                                if (grp.caches.isEmpty())
+                                    it.remove();
+
+                                break;
+                            }
+                        }
+                    }
+                }
+
                 if (crd) {
                     boolean rmvCache = false;
 
@@ -753,7 +886,9 @@
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                null);
 
             cache.affinity().initialize(fut.topologyVersion(), newAff);
         }
@@ -785,13 +920,16 @@
         }
 
         if (crd && lateAffAssign) {
+            final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                fut.discoveryEvent());
+
             forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
                 @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException {
                     CacheHolder cache = cache(fut, desc);
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
 
                         cache.affinity().initialize(fut.topologyVersion(), assignment);
                     }
@@ -799,10 +937,13 @@
             });
         }
         else {
+            final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                fut.discoveryEvent());
+
             forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(aff, fut, false);
+                        initAffinity(aff, fut, false, affCache);
                 }
             });
         }
@@ -814,10 +955,13 @@
      * @param fetch Force fetch flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
+    private void initAffinity(GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut,
+        boolean fetch,
+        AffinityCalculateCache affCache)
         throws IgniteCheckedException {
         if (!fetch && canCalculateAffinity(aff, fut)) {
-            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
 
             aff.initialize(fut.topologyVersion(), assignment);
         }
@@ -872,13 +1016,18 @@
         if (lateAffAssign) {
             if (locJoin) {
                 if (crd) {
+                    final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                        fut.discoveryEvent());
+
                     forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
                         @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
                             AffinityTopologyVersion topVer = fut.topologyVersion();
 
                             CacheHolder cache = cache(fut, cacheDesc);
 
-                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent());
+                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
+                                fut.discoveryEvent(),
+                                affCache);
 
                             cache.affinity().initialize(topVer, newAff);
                         }
@@ -933,10 +1082,14 @@
      * @throws IgniteCheckedException If failed.
      */
     private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+        long start = System.currentTimeMillis();
+
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
 
+        AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
@@ -944,8 +1097,9 @@
             DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId());
 
             if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
-                List<List<ClusterNode>> assignment =
-                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                List<List<ClusterNode>> assignment = cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(),
+                    fut.discoveryEvent(),
+                    affCache);
 
                 cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
             }
@@ -967,6 +1121,8 @@
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }
+
+        log.info("Affinity fetch time [topVer=" + topVer + ", time=" + (System.currentTimeMillis() - start) + ']');
     }
 
     /**
@@ -986,7 +1142,7 @@
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), null);
 
             affCache.initialize(topVer, aff);
         }
@@ -998,7 +1154,7 @@
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent());
+                affCache.calculate(topVer, fut.discoveryEvent(), null);
             }
 
             List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
@@ -1024,14 +1180,20 @@
         boolean centralizedAff;
 
         if (lateAffAssign) {
+            AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                fut.discoveryEvent());
+
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
                     continue;
 
-                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
             }
 
             centralizedAff = true;
+
+            log.info("Initialized affinity on node left [topVer=" + fut.topologyVersion() +
+                ", calcCnt=" + affCache.calculateCount() + ']');
         }
         else {
             initCachesAffinity(fut);
@@ -1055,12 +1217,18 @@
     private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
         assert !lateAffAssign;
 
+        long start = System.currentTimeMillis();
+
+        AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
 
-            initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+            initAffinity(cacheCtx.affinity().affinityCache(), fut, false, affCache);
         }
+
+        log.info("Affinity init time [topVer=" + fut.topologyVersion() + ", time=" + (System.currentTimeMillis() - start) + ']');
     }
 
     /**
@@ -1072,13 +1240,15 @@
         throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
 
+        final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
         forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
             @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException {
                 CacheHolder cache = caches.get(desc.cacheId());
 
                 if (cache != null) {
                     if (cache.client())
-                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
 
                     return;
                 }
@@ -1129,7 +1299,7 @@
                             throws IgniteCheckedException {
                             fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
-                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
 
                             affFut.onDone(fut.topologyVersion());
                         }
@@ -1210,34 +1380,40 @@
         throws IgniteCheckedException {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
-        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+        final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
 
-        if (!crd) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
-                    continue;
+        try {
+            if (!crd) {
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    if (cacheCtx.isLocal())
+                        continue;
 
-                boolean latePrimary = cacheCtx.rebalanceEnabled();
+                    boolean latePrimary = cacheCtx.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
-            }
-
-            return null;
-        }
-        else {
-            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
-
-            forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
-                @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
-                    CacheHolder cache = cache(fut, cacheDesc);
-
-                    boolean latePrimary = cache.rebalanceEnabled;
-
-                    initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
+                    initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
                 }
-            });
 
-            return waitRebalanceInfo;
+                return null;
+            }
+            else {
+                final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
+
+                forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
+                    @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
+                        CacheHolder cache = cache(fut, cacheDesc);
+
+                        boolean latePrimary = cache.rebalanceEnabled;
+
+                        initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
+                    }
+                });
+
+                return waitRebalanceInfo;
+            }
+        }
+        finally {
+            log.info("Initialized affinity on node join [topVer=" + topVer +
+                ", calcCnt=" + affCache.calculateCount() + ", lateAffCalcTime=" + affCache.lateAffinityCalculateTime() + ']');
         }
     }
 
@@ -1253,7 +1429,7 @@
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
-        Map<Object, List<List<ClusterNode>>> affCache)
+        AffinityCalculateCache affCache)
         throws IgniteCheckedException
     {
         assert lateAffAssign;
@@ -1265,14 +1441,16 @@
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [cache=" + aff.cacheName() +
             ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
 
-        List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
-
-        assert aff.idealAssignment() != null : "Previous assignment is not available.";
-
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), affCache);
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
+            long start = U.currentTimeMillis();
+
+            List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
+
+            assert aff.idealAssignment() != null : "Previous assignment is not available.";
+
             for (int p = 0; p < idealAssignment.size(); p++) {
                 List<ClusterNode> newNodes = idealAssignment.get(p);
                 List<ClusterNode> curNodes = curAff.get(p);
@@ -1295,12 +1473,15 @@
                     newAssignment.set(p, nodes0);
                 }
             }
+
+            if (affCache != null)
+                affCache.addLateAffinityCalculateTime(U.currentTimeMillis() - start);
         }
 
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+        aff.initialize(fut.topologyVersion(), newAssignment);
     }
 
     /**
@@ -1517,6 +1698,14 @@
         return assignment;
     }
 
+    public GridAffinityAssignmentCache cacheAssignment(Integer cacheId) {
+        CacheHolder holder = caches.get(cacheId);
+
+        assert holder != null;
+
+        return holder.affinity();
+    }
+
     /**
      *
      */
@@ -1716,9 +1905,7 @@
 
             GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
                 ccfg.getName(),
-                affFunc,
-                ccfg.getNodeFilter(),
-                ccfg.getBackups(),
+                new AffinityConfiguration(affFunc, ccfg.getNodeFilter(), ccfg.getBackups()),
                 ccfg.getCacheMode() == LOCAL);
 
             return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
@@ -1730,7 +1917,7 @@
          * @param aff Affinity.
          * @param initAff Current affinity.
          */
-        public CacheHolder2(
+        CacheHolder2(
             boolean rebalanceEnabled,
             GridCacheSharedContext cctx,
             GridAffinityAssignmentCache aff,
@@ -1754,7 +1941,7 @@
     /**
      *
      */
-    class WaitRebalanceInfo {
+    private class WaitRebalanceInfo {
         /** */
         private final AffinityTopologyVersion topVer;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index c6e7ee6..6b47f82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -26,6 +26,7 @@
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.util.GridLeanSet;
@@ -67,9 +68,7 @@
 
         aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
             cctx.namex(),
-            affFunction,
-            cctx.config().getNodeFilter(),
-            cctx.config().getBackups(),
+            new AffinityConfiguration(affFunction, cctx.config().getNodeFilter(), cctx.config().getBackups()),
             cctx.isLocal());
     }
 
@@ -77,7 +76,7 @@
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (cctx.isLocal())
             // No discovery event needed for local affinity.
-            aff.calculate(LOC_CACHE_TOP_VER, null);
+            aff.calculate(LOC_CACHE_TOP_VER, null, null);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 503b334..3e1e31f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -119,6 +120,9 @@
     /** Exchange history size. */
     private static final int EXCHANGE_HISTORY_SIZE = 1000;
 
+    /** */
+    private boolean skipFirstExchangeMsg;
+
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
@@ -300,10 +304,24 @@
         }
     };
 
+    public boolean skipFirstExchangeMessage() {
+        return skipFirstExchangeMsg;
+    }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
+        if (getBoolean("SKIP_FIRST_EXCHANGE_MSG", false)) {
+            if (cctx.kernalContext().config().isLateAffinityAssignment()) {
+                skipFirstExchangeMsg = true;
+
+                cctx.kernalContext().addNodeAttribute("SKIP_FIRST_EXCHANGE_MSG", true);
+            }
+            else
+                U.warn(log, "Can not use SKIP_FIRST_EXCHANGE_MSG optimization when late affinity assignment disabled.");
+        }
+
         exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
@@ -1105,6 +1123,17 @@
         if (log.isDebugEnabled())
             log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
 
+        if (exchFut.singleMsgUpdateCnt > 0) {
+            log.info("Exchange done [topVer=" + topVer +
+                ", singleMsgUpdateTime=" + exchFut.singleMsgUpdateTime +
+                ", singleMsgUpdateCnt=" + exchFut.singleMsgUpdateCnt +
+                ", singleMsgUpdateMaxTime=" + exchFut.singleMsgUpdateMaxTime +
+                ", singleMsgUpdateMinTime=" + exchFut.singleMsgUpdateMinTime +
+                ", err=" + err + ']');
+        }
+        else
+            log.info("Exchange done [topVer=" + topVer + ", err=" + err + ']');
+
         IgniteProductVersion minVer = cctx.localNode().version();
         IgniteProductVersion maxVer = cctx.localNode().version();
 
@@ -1224,7 +1253,7 @@
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue(), null);
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -1273,7 +1302,7 @@
                         top = cacheCtx.topology();
 
                     if (top != null) {
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue(), null);
 
                         cctx.affinity().checkRebalanceState(top, cacheId);
                     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5efb317..4ddb0f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -549,7 +549,7 @@
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -563,7 +563,7 @@
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -571,7 +571,7 @@
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
-                return null;
+                return false;
             }
 
             updateSeq.incrementAndGet();
@@ -634,7 +634,7 @@
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + fullMapString());
 
-            return null;
+            return false;
         }
         finally {
             lock.writeLock().unlock();
@@ -643,7 +643,7 @@
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -654,21 +654,21 @@
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
 
-            return null;
+            return false;
         }
 
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (exchId != null)
@@ -688,7 +688,7 @@
                     log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
                         ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -740,7 +740,7 @@
             if (log.isDebugEnabled())
                 log.debug("Partition map after single update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index cad1617..af66a82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -142,9 +142,11 @@
                 List<ClusterNode> nodes = new ArrayList<>(ids.size());
 
                 for (int j = 0; j < ids.size(); j++) {
-                    ClusterNode node = disco.node(topVer, ids.get(j));
+                    UUID nodeId = ids.get(j);
 
-                    assert node != null;
+                    ClusterNode node = disco.node(topVer, nodeId);
+
+                    assert node != null : nodeId;
 
                     nodes.add(node);
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4ae4e47..b0a4183 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -205,7 +205,7 @@
      * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap);
 
@@ -215,7 +215,7 @@
      * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         @Nullable Map<Integer, Long> cntrMap);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f3751ac..12a0d43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -350,7 +350,9 @@
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            initPartitions0(exchFut, updateSeq);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            initPartitions0(oldest, exchFut, updateSeq);
 
             consistencyCheck();
         }
@@ -363,11 +365,9 @@
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      */
-    private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+    private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
         assert oldest != null || cctx.kernalContext().clientNode();
 
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -407,12 +407,12 @@
                         if (log.isDebugEnabled())
                             log.debug("Owned partition for oldest node: " + locPart);
 
-                        updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                        updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                     }
                 }
             }
             else
-                createPartitions(aff, updateSeq);
+                createPartitions(oldest, aff, updateSeq);
         }
         else {
             // If preloader is disabled, then we simply clear out
@@ -429,7 +429,7 @@
                         if (state.active()) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                             if (log.isDebugEnabled())
                                 log.debug("Evicting partition with rebalancing disabled " +
@@ -443,7 +443,7 @@
         }
 
         if (node2part != null && node2part.valid())
-            checkEvictions(updateSeq, aff);
+            checkEvictions(oldest, updateSeq, aff);
 
         updateRebalanceVersion(aff);
     }
@@ -452,7 +452,7 @@
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */
-    private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
+    private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> aff, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
         int num = cctx.affinity().partitions();
@@ -464,7 +464,7 @@
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                    updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -533,11 +533,11 @@
             }
 
             if (affReady)
-                initPartitions0(exchFut, updateSeq);
+                initPartitions0(oldest, exchFut, updateSeq);
             else {
                 List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 
-                createPartitions(aff, updateSeq);
+                createPartitions(oldest, aff, updateSeq);
             }
 
             consistencyCheck();
@@ -584,6 +584,8 @@
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
@@ -610,7 +612,7 @@
                                 assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
                                     locPart + ']';
 
-                                updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                                updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                                 changed = true;
 
@@ -630,7 +632,7 @@
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                     }
                 }
                 else {
@@ -640,7 +642,7 @@
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                             changed = true;
 
@@ -1014,7 +1016,7 @@
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -1026,7 +1028,7 @@
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -1054,7 +1056,7 @@
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -1062,7 +1064,7 @@
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -1129,7 +1131,9 @@
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed = checkEvictions(updateSeq, aff);
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+                changed = checkEvictions(oldest, updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1139,7 +1143,7 @@
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
@@ -1148,7 +1152,7 @@
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -1159,33 +1163,28 @@
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
 
-            return null;
+            return false;
         }
 
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
-                    Long cntr = this.cntrMap.get(e.getKey());
+                    Integer p = e.getKey();
+
+                    Long cntr = this.cntrMap.get(p);
 
                     if (cntr == null || cntr < e.getValue())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
+                        this.cntrMap.put(p, e.getValue());
 
-                for (int i = 0; i < locParts.length; i++) {
-                    GridDhtLocalPartition part = locParts[i];
+                    GridDhtLocalPartition part = locParts[p];
 
-                    if (part == null)
-                        continue;
-
-                    Long cntr = cntrMap.get(part.id());
-
-                    if (cntr != null)
-                        part.updateCounter(cntr);
+                    if (part != null)
+                        part.updateCounter(e.getValue());
                 }
             }
 
@@ -1194,7 +1193,7 @@
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (exchId != null)
@@ -1211,7 +1210,7 @@
                     log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
                         ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -1254,7 +1253,9 @@
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+                changed |= checkEvictions(oldest, updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1264,7 +1265,7 @@
             if (log.isDebugEnabled())
                 log.debug("Partition map after single update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
@@ -1276,7 +1277,7 @@
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
+    private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>> aff) {
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1299,7 +1300,7 @@
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                         part.rent(false);
 
-                        updateLocal(part.id(), locId, part.state(), updateSeq);
+                        updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
 
                         changed = true;
 
@@ -1324,7 +1325,7 @@
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
 
-                                    updateLocal(part.id(), locId, part.state(), updateSeq);
+                                    updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
 
                                     changed = true;
 
@@ -1353,12 +1354,9 @@
      * @param updateSeq Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
+    private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
         assert nodeId.equals(cctx.nodeId());
 
-        // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
         assert oldest != null || cctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
@@ -1453,7 +1451,9 @@
 
         try {
             if (part.own()) {
-                updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+                updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
 
                 consistencyCheck();
 
@@ -1481,7 +1481,9 @@
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
 
-            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq);
 
             consistencyCheck();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f391265..66f3ed1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -65,7 +65,6 @@
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -81,6 +80,7 @@
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 
 /**
  * Future for exchanging partition maps.
@@ -112,6 +112,22 @@
 
     /** */
     @GridToStringExclude
+    private int pendingSingleUpdates;
+
+    /** */
+    public int singleMsgUpdateCnt;
+
+    /** */
+    public long singleMsgUpdateTime;
+
+    /** */
+    public long singleMsgUpdateMaxTime;
+
+    /** */
+    public long singleMsgUpdateMinTime = Long.MAX_VALUE;
+
+    /** */
+    @GridToStringExclude
     private List<ClusterNode> srvNodes;
 
     /** */
@@ -431,6 +447,10 @@
         assert !dummy && !forcePreload : this;
 
         try {
+            long initStart = System.currentTimeMillis();
+
+            log.info("Start exchange init [topVer=" + topologyVersion() + ']');
+
             srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
 
             remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -462,14 +482,22 @@
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
                 }
 
+                long affStart = System.currentTimeMillis();
+
                 if (CU.clientNode(discoEvt.eventNode()))
                     exchange = onClientNodeEvent(crdNode);
                 else
                     exchange = onServerNodeEvent(crdNode);
+
+                log.info("Affinity call time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - affStart) + ']');
             }
 
+            long topUpdateStart = System.currentTimeMillis();
+
             updateTopologies(crdNode);
 
+            log.info("Top update time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - topUpdateStart) + ']');
+
             switch (exchange) {
                 case ALL: {
                     distributedExchange();
@@ -495,7 +523,10 @@
 
                 default:
                     assert false;
+
             }
+
+            log.info("Finish exchange init [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - initStart) + ']');
         }
         catch (IgniteInterruptedCheckedException e) {
             onDone(e);
@@ -719,6 +750,8 @@
 
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
+        long beforeExchStart = System.currentTimeMillis();
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
                 continue;
@@ -733,12 +766,68 @@
             cacheCtx.topology().beforeExchange(this, !centralizedAff);
         }
 
+        log.info("Before exchange time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - beforeExchStart) + ']');
+
         if (crd.isLocal()) {
+            ClusterNode node = discoEvt.eventNode();
+
+            Object attr = node.attribute("SKIP_FIRST_EXCHANGE_MSG");
+
+            boolean skipFirstExchange = Boolean.TRUE.equals(attr) || ((attr instanceof String) && "true".equalsIgnoreCase((String)attr));
+
+            if (discoEvt.type() == EVT_NODE_JOINED && !node.isLocal() && skipFirstExchange) {
+                assert !CU.clientNode(node) : discoEvt;
+                assert srvNodes.contains(node);
+
+                boolean rmv = remaining.remove(node.id());
+                assert rmv;
+
+                Collection<String> caches = cctx.discovery().nodeCaches(topologyVersion(), node);
+
+                if (!caches.isEmpty()) {
+                    GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+                        false,
+                        null,
+                        false);
+
+                    for (String cache : caches) {
+                        Map<Integer, GridDhtPartitionState> m = new HashMap<>();
+
+                        GridDhtPartitionMap2 partMap = new GridDhtPartitionMap2(node.id(), 1, topologyVersion(), m, true);
+
+                        GridAffinityAssignmentCache assign = cctx.affinity().cacheAssignment(CU.cacheId(cache));
+
+                        for (Integer part : assign.primaryPartitions(node.id(), topologyVersion()))
+                            partMap.put(part, MOVING);
+                        for (Integer part : assign.backupPartitions(node.id(), topologyVersion()))
+                            partMap.put(part, MOVING);
+
+                        msg.addLocalPartitionMap(CU.cacheId(cache), partMap, null);
+                    }
+
+                    updatePartitionSingleMap(msg);
+                }
+            }
+
             if (remaining.isEmpty())
                 onAllReceived(false);
         }
-        else
-            sendPartitions(crd);
+        else {
+            boolean skipSnd = false;
+
+            if (cctx.exchange().skipFirstExchangeMessage() && discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal())
+                skipSnd = true;
+
+            if (!skipSnd) {
+                long sndStart = System.currentTimeMillis();
+
+                sendPartitions(crd);
+
+                log.info("Send parts time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - sndStart) + ']');
+            }
+            else
+                log.info("Skip first exchange message [topVer=" + topologyVersion() + ']');
+        }
 
         initDone();
     }
@@ -1162,23 +1251,66 @@
      */
     private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
         boolean allReceived = false;
+        boolean updateSingleMap = false;
 
         synchronized (mux) {
             assert crd != null;
 
             if (crd.isLocal()) {
                 if (remaining.remove(node.id())) {
-                    updatePartitionSingleMap(msg);
+                    updateSingleMap = true;
+
+                    pendingSingleUpdates++;
 
                     allReceived = remaining.isEmpty();
                 }
+
+                singleMsgUpdateCnt++;
             }
             else
                 singleMsgs.put(node, msg);
         }
 
-        if (allReceived)
+        if (updateSingleMap) {
+            long start = U.currentTimeMillis();
+
+            try {
+                updatePartitionSingleMap(msg);
+            }
+            finally {
+                synchronized (mux) {
+                    long time = U.currentTimeMillis() - start;
+
+                    if (time > singleMsgUpdateMaxTime)
+                        singleMsgUpdateMaxTime = time;
+                    if (time < singleMsgUpdateMinTime)
+                        singleMsgUpdateMinTime = time;
+
+                    singleMsgUpdateTime += time;
+
+                    assert pendingSingleUpdates > 0;
+
+                    pendingSingleUpdates--;
+
+                    if (pendingSingleUpdates == 0)
+                        mux.notifyAll();
+                }
+            }
+        }
+
+        if (allReceived) {
+            synchronized (mux) {
+                try {
+                    while (pendingSingleUpdates > 0)
+                        U.wait(mux);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    U.warn(log, "Failed to wait for partition map updates.");
+                }
+            }
+
             onAllReceived(false);
+        }
     }
 
     /**
@@ -1211,6 +1343,8 @@
         try {
             assert crd.isLocal();
 
+            log.info("Coordinator exchange all received [topVer=" + topologyVersion() + ']');
+
             if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (!cacheCtx.isLocal())
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8814745..67bd252 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -129,6 +129,7 @@
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -136,6 +137,7 @@
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -174,8 +176,10 @@
         IgniteProductVersion.fromString("1.5.0");
 
     /** */
-    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
+    private static final boolean SEND_JOIN_REQ_DIRECTLY = getBoolean("SEND_JOIN_REQ_DIRECTLY", false);
+
+    /** */
+    private IgniteThreadPoolExecutor utilityPool;
 
     /** Nodes ring. */
     @GridToStringExclude
@@ -297,6 +301,13 @@
             spiState = DISCONNECTED;
         }
 
+        utilityPool = new IgniteThreadPoolExecutor("disco-pool",
+            spi.ignite().name(),
+            0,
+            1,
+            2000,
+            new LinkedBlockingQueue<Runnable>());
+
         if (debugMode) {
             if (!log.isInfoEnabled())
                 throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
@@ -918,6 +929,8 @@
 
         if (log.isDebugEnabled())
             log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
+
+        log.info("Node joined topology: " + locNode);
     }
 
     /**
@@ -974,7 +987,7 @@
 
             for (InetSocketAddress addr : addrs) {
                 try {
-                    Integer res = sendMessageDirectly(joinReq, addr);
+                    Integer res = sendMessageDirectly(joinReq, addr, true);
 
                     assert res != null;
 
@@ -1087,13 +1100,15 @@
      *
      * @param msg Message to send.
      * @param addr Address to send message to.
+     * @param join {@code True} if sends initial node join request.
      * @return Response read from the recipient or {@code null} if no response is supposed.
      * @throws IgniteSpiException If an error occurs.
      */
-    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr)
+    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, boolean join)
         throws IgniteSpiException {
         assert msg != null;
         assert addr != null;
+        assert !join || msg instanceof TcpDiscoveryJoinRequestMessage;
 
         Collection<Throwable> errs = null;
 
@@ -1180,7 +1195,7 @@
                 // Connection has been established, but
                 // join request may not be unmarshalled on remote host.
                 // E.g. due to class not found issue.
-                joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
+                joinReqSent = join;
 
                 int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
@@ -2403,9 +2418,12 @@
         /** Connection check threshold. */
         private long connCheckThreshold;
 
+        /** */
+        private long lastRingMsgTime;
+
         /**
          */
-        protected RingMessageWorker() {
+        RingMessageWorker() {
             super("tcp-disco-msg-worker", 10);
 
             initConnectionCheckFrequency();
@@ -2500,6 +2518,8 @@
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            sendHeartbeatMessage();
+
             DebugLogger log = messageLogger(msg);
 
             if (log.isDebugEnabled())
@@ -2508,6 +2528,11 @@
             if (debugMode)
                 debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
+            boolean ensured = spi.ensured(msg);
+
+            if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+                lastRingMsgTime = U.currentTimeMillis();
+
             if (locNode.internalOrder() == 0) {
                 boolean proc = false;
 
@@ -2564,7 +2589,7 @@
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (spi.ensured(msg) && redirectToClients(msg))
+            if (ensured && redirectToClients(msg))
                 msgHist.add(msg);
 
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -3744,8 +3769,79 @@
                 if (nodeAddedMsg.verified())
                     msgHist.add(nodeAddedMsg);
             }
-            else if (sendMessageToRemotes(msg))
-                sendMessageAcrossRing(msg);
+            else {
+                if (sendMessageToRemotes(msg)) {
+                    if (SEND_JOIN_REQ_DIRECTLY && !msg.directSendFailed()) {
+                        final TcpDiscoveryNode crd = resolveCoordinator();
+
+                        Collection<TcpDiscoveryNode> failedNodes;
+
+                        synchronized (mux) {
+                            failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());
+                        }
+
+                        TcpDiscoveryNode next = ring.nextNode(failedNodes);
+
+                        if (crd != null && !crd.equals(next)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Will send join request directly to coordinator " +
+                                    "[msg=" + msg + ", crd=" + crd + ", next=" + next + ']');
+                            }
+
+                            log.info("Will send join request directly to coordinator " +
+                                "[cnt=" + joiningNodes.size() + ", msg=" + msg + ", crd=" + crd + ", next=" + next + ']');
+
+                            utilityPool.submit(new Runnable() {
+                                @Override public void run() {
+                                    IgniteSpiException sndErr = null;
+                                    Integer res = null;
+
+                                    TcpDiscoveryJoinRequestMessage msg0 =
+                                        new TcpDiscoveryJoinRequestMessage(msg.node(), msg.discoveryData());
+
+                                    try {
+                                        res = trySendMessageDirectly(crd, msg0);
+
+                                        if (F.eq(RES_OK, res)) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Sent join request directly to coordinator " +
+                                                    "[msg=" + msg0 + ", crd=" + crd + ']');
+                                            }
+
+                                            log.info("Sent join request directly to coordinator " +
+                                                "[msg=" + msg0 + ", crd=" + crd + ']');
+
+                                            return;
+                                        }
+                                    }
+                                    catch (IgniteSpiException e) {
+                                        sndErr = e;
+                                    }
+
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Failed to send join request to coordinator, will process from " +
+                                            "message worker [msg=" + msg0 + ", crd=" + crd + ", err=" + sndErr +
+                                            ", res=" + res + ']');
+                                    }
+
+                                    log.info("Failed to send join request to coordinator, will process from " +
+                                        "message worker [msg=" + msg0 + ", crd=" + crd + ", err=" + sndErr +
+                                        ", res=" + res + ']');
+
+                                    msg.directSendFailed(true);
+
+                                    msgWorker.addMessage(msg);
+                                }
+                            });
+
+                            return;
+                        }
+                    }
+
+                    sendMessageAcrossRing(msg);
+                }
+
+            }
         }
 
         /**
@@ -3780,7 +3876,7 @@
          * @param msg Message.
          * @throws IgniteSpiException Last failure if all attempts failed.
          */
-        private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg)
+        private Integer trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg)
             throws IgniteSpiException {
             if (node.isClient()) {
                 TcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId());
@@ -3801,25 +3897,21 @@
 
                     worker.addMessage(msg);
 
-                    return;
+                    return null;
                 }
 
-                trySendMessageDirectly(routerNode, msg);
-
-                return;
+                return trySendMessageDirectly(routerNode, msg);
             }
 
             IgniteSpiException ex = null;
 
             for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
                 try {
-                    sendMessageDirectly(msg, addr);
+                    Integer res = sendMessageDirectly(msg, addr, false);
 
                     node.lastSuccessfulAddress(addr);
 
-                    ex = null;
-
-                    break;
+                    return res;
                 }
                 catch (IgniteSpiException e) {
                     ex = e;
@@ -3828,6 +3920,8 @@
 
             if (ex != null)
                 throw ex;
+
+            return null;
         }
 
         /**
@@ -5336,12 +5430,9 @@
          * Sends heartbeat message if needed.
          */
         private void sendHeartbeatMessage() {
-            if (!isLocalNodeCoordinator())
-                return;
-
             long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
 
-            if (elapsed > 0)
+            if (elapsed > 0 || !isLocalNodeCoordinator())
                 return;
 
             TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5361,7 +5452,9 @@
             if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
                 lastTimeStatusMsgSent = locNode.lastUpdateTime();
 
-            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+            long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+            long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
 
             if (elapsed > 0)
                 return;
@@ -5544,6 +5637,9 @@
 
             ClientMessageWorker clientMsgWrk = null;
 
+            TcpDiscoveryAbstractMessage msg = null;
+            Exception sockE = null;
+
             try {
                 InputStream in;
 
@@ -5604,7 +5700,7 @@
                     // Restore timeout.
                     sock.setSoTimeout(timeout);
 
-                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+                    msg = spi.readMessage(sock, in, spi.netTimeout);
 
                     // Ping.
                     if (msg instanceof TcpDiscoveryPingRequest) {
@@ -5709,6 +5805,8 @@
                     }
                 }
                 catch (IOException e) {
+                    sockE = e;
+
                     if (log.isDebugEnabled())
                         U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 
@@ -5736,6 +5834,8 @@
                     return;
                 }
                 catch (IgniteCheckedException e) {
+                    sockE = e;
+
                     if (log.isDebugEnabled())
                         U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 
@@ -5765,8 +5865,7 @@
 
                 while (!isInterrupted()) {
                     try {
-                        TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                        msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
 
                         msg.senderNodeId(nodeId);
 
@@ -5793,9 +5892,12 @@
 
                                 if (clientMsgWrk != null && ok)
                                     continue;
-                                else
+                                else {
+                                    log.info("Processed join request, close connection [msg=" + msg + ']');
+
                                     // Direct join request - no need to handle this socket anymore.
                                     break;
+                                }
                             }
                         }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -5968,6 +6070,8 @@
                             processClientHeartbeatMessage(heartbeatMsg);
                     }
                     catch (IgniteCheckedException e) {
+                        sockE = e;
+
                         if (log.isDebugEnabled())
                             U.error(log, "Caught exception on message read [sock=" + sock +
                                 ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
@@ -5995,6 +6099,8 @@
                         return;
                     }
                     catch (IOException e) {
+                        sockE = e;
+
                         if (log.isDebugEnabled())
                             U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
                                 ", rmtNodeId=" + nodeId + ']', e);
@@ -6018,6 +6124,8 @@
                 }
             }
             finally {
+                log.info("Close sock [readers=" + spi.stats.socketReaders() + ", msg=" + msg + ", err=" + sockE + ']');
+
                 if (clientMsgWrk != null) {
                     if (log.isDebugEnabled())
                         log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
@@ -6062,11 +6170,11 @@
 
             TcpDiscoverySpiState state = spiStateCopy();
 
-            long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
 
             if (state == CONNECTED) {
-                spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -6103,7 +6211,7 @@
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(msg, sock, res, socketTimeout);
+                spi.writeToSocket(msg, sock, res, sockTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index 9e73632..b434c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -628,6 +628,10 @@
         return sockReadersCreated;
     }
 
+    public synchronized int socketReaders() {
+        return sockReadersCreated - sockReadersRmv;
+    }
+
     /**
      * Gets socket readers removed count.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 22ffae8..4422919 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -36,6 +37,10 @@
     /** Discovery data. */
     private final Map<Integer, byte[]> discoData;
 
+    /** */
+    @GridToStringExclude
+    private transient boolean directSndFailed;
+
     /**
      * Constructor.
      *
@@ -79,6 +84,14 @@
         setFlag(RESPONDED_FLAG_POS, responded);
     }
 
+    public boolean directSendFailed() {
+        return directSndFailed;
+    }
+
+    public void directSendFailed(boolean directSndFailed) {
+        this.directSndFailed = directSndFailed;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object obj) {
         // NOTE!
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index a59ca8b..2d46cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -76,7 +76,12 @@
         Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
 
         for (int part = 0; part < aff.getPartitions(); part++) {
-            Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null);
+            Collection<ClusterNode> affNodes = aff.assignPartition(null,
+                part,
+                new ArrayList<>(nodes),
+                new HashMap<ClusterNode, byte[]>(),
+                0,
+                null);
 
             assertEquals(1, affNodes.size());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 7e37450..643aa86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2581,6 +2581,17 @@
 
             return !excludeNodes.contains(name);
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+
+            if (obj == null || !(getClass() == obj.getClass()))
+                return false;
+
+            return F.eq(excludeNodes, ((CacheNodeFilter)obj).excludeNodes);
+        }
     }
 
     /**