IGNITE-5398 - Reuse sort container across multiple affinity assignment calculations
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java
index fd071cb..d5c021a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java
@@ -68,4 +68,18 @@
      *      not available.
      */
     @Nullable public DiscoveryEvent discoveryEvent();
+
+    /**
+     * Gets an optional context attachment that will be passed across different cache affinity calculations.
+     *
+     * @return Optional user attachment.
+     */
+    @Nullable public <T> T attachment();
+
+    /**
+     * Sets an optional context attachment that will be passed across different cache affinity calculations.
+     *
+     * @param attachment Attachment to set.
+     */
+    public <T> void attachment(T attachment);
 }
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 021f4e2..7a741cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -40,12 +40,10 @@
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
@@ -79,7 +77,7 @@
     public static final int DFLT_PARTITION_COUNT = 1024;
 
     /** Comparator. */
-    private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
+    private static final Comparator<NodeWithHash> COMPARATOR = new HashComparator();
 
     /** Number of partitions. */
     private int parts;
@@ -336,26 +334,27 @@
     public List<ClusterNode> assignPartition(int part,
         List<ClusterNode> nodes,
         int backups,
-        @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
+        @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache,
+        NodeWithHash[] sortContainer
+    ) {
         if (nodes.size() <= 1)
             return nodes;
 
-        IgniteBiTuple<Long, ClusterNode> [] hashArr =
-            (IgniteBiTuple<Long, ClusterNode> [])new IgniteBiTuple[nodes.size()];
+        assert sortContainer.length == nodes.size() : "Invalid sort container [part=" + part +
+            "nodes=" + nodes + ", container=" + Arrays.toString(sortContainer) + ']';
 
         for (int i = 0; i < nodes.size(); i++) {
             ClusterNode node = nodes.get(i);
 
             Object nodeHash = resolveNodeHash(node);
 
-            long hash = hash(nodeHash.hashCode(), part);
-
-            hashArr[i] = F.t(hash, node);
+            sortContainer[i].node = node;
+            sortContainer[i].hash = hash(nodeHash.hashCode(), part);
         }
 
         final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
 
-        Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups);
+        Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(sortContainer, primaryAndBackups);
 
         Iterator<ClusterNode> it = sortedNodes.iterator();
 
@@ -474,8 +473,10 @@
 
         List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
 
+        NodeWithHash[] container = getOrCreateSortContainer(affCtx);
+
         for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache);
+            List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache, container);
 
             assignments.add(partAssignment);
         }
@@ -488,6 +489,26 @@
         // No-op.
     }
 
+    /**
+     * @param ctx Affinity function context.
+     * @return Array to use for partition assignment.
+     */
+    private NodeWithHash[] getOrCreateSortContainer(AffinityFunctionContext ctx) {
+        NodeWithHash[] prev = ctx.attachment();
+        List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot();
+
+        if (prev == null || prev.length != topSnapshot.size()) {
+            prev = new NodeWithHash[topSnapshot.size()];
+
+            for (int i = 0; i < prev.length; i++)
+                prev[i] = new NodeWithHash();
+
+            ctx.attachment(prev);
+        }
+
+        return prev;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(parts);
@@ -508,14 +529,14 @@
     /**
      *
      */
-    private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+    private static class HashComparator implements Comparator<NodeWithHash>, Serializable {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
-            return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
-                o1.get2().id().compareTo(o2.get2().id());
+        @Override public int compare(NodeWithHash o1, NodeWithHash o2) {
+            return o1.hash < o2.hash ? -1 : o1.hash > o2.hash ? 1 :
+                o1.node.id().compareTo(o2.node.id());
         }
     }
 
@@ -524,7 +545,7 @@
      */
     private static class LazyLinearSortedContainer implements Iterable<ClusterNode> {
         /** Initial node-hash array. */
-        private final IgniteBiTuple<Long, ClusterNode>[] arr;
+        private final NodeWithHash[] arr;
 
         /** Count of the sorted elements */
         private int sorted;
@@ -533,7 +554,7 @@
          * @param arr Node / partition hash list.
          * @param needFirstSortedCnt Estimate count of elements to return by iterator.
          */
-        LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) {
+        LazyLinearSortedContainer(NodeWithHash[] arr, int needFirstSortedCnt) {
             this.arr = arr;
 
             if (needFirstSortedCnt > (int)Math.log(arr.length)) {
@@ -566,9 +587,9 @@
                     throw new NoSuchElementException();
 
                 if (cur < sorted)
-                    return arr[cur++].get2();
+                    return arr[cur++].node;
 
-                IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+                NodeWithHash min = arr[cur];
 
                 int minIdx = cur;
 
@@ -588,7 +609,7 @@
 
                 sorted = cur++;
 
-                return min.get2();
+                return min.node;
             }
 
             /** {@inheritDoc} */
@@ -597,4 +618,15 @@
             }
         }
     }
+
+    /**
+     * A specific tuple which holds a primitive long to avoid unneccessary boxing.
+     */
+    public static class NodeWithHash {
+        /** */
+        private ClusterNode node;
+
+        /** */
+        private long hash;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java
new file mode 100644
index 0000000..0467bc7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+/**
+ *
+ */
+public class AffinityAttachmentHolder {
+    /** */
+    private Object attachment;
+
+    /**
+     * @return User attachment.
+     */
+    public Object attachment() {
+        return attachment;
+    }
+
+    /**
+     * @param attachment User attachment.
+     */
+    public void attachment(Object attachment) {
+        this.attachment = attachment;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 5070462..3fc59fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -257,8 +257,12 @@
      * @return Affinity assignments.
      */
     @SuppressWarnings("IfMayBeConditional")
-    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt,
-        DiscoCache discoCache) {
+    public List<List<ClusterNode>> calculate(
+        AffinityTopologyVersion topVer,
+        DiscoveryEvent discoEvt,
+        DiscoCache discoCache,
+        AffinityAttachmentHolder holder
+    ) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
                 ", discoEvt=" + discoEvt + ']');
@@ -284,12 +288,22 @@
             if (!affNode)
                 assignment = prevAssignment;
             else
-                assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment,
-                    discoEvt, topVer, backups));
+                assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(
+                    sorted,
+                    prevAssignment,
+                    discoEvt,
+                    topVer,
+                    backups,
+                    holder));
         }
         else
-            assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt,
-                topVer, backups));
+            assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(
+                sorted,
+                prevAssignment,
+                discoEvt,
+                topVer,
+                backups,
+                holder));
 
         assert assignment != null;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
index e2bb99d..05ee846 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
@@ -43,17 +43,27 @@
     /** Number of backups to assign. */
     private final int backups;
 
+    /** */
+    private AffinityAttachmentHolder holder;
+
     /**
      * @param topSnapshot Topology snapshot.
      * @param topVer Topology version.
      */
-    public GridAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment,
-        DiscoveryEvent discoEvt, @NotNull AffinityTopologyVersion topVer, int backups) {
+    public GridAffinityFunctionContextImpl(
+        List<ClusterNode> topSnapshot,
+        List<List<ClusterNode>> prevAssignment,
+        DiscoveryEvent discoEvt,
+        @NotNull AffinityTopologyVersion topVer,
+        int backups,
+        AffinityAttachmentHolder holder
+    ) {
         this.topSnapshot = topSnapshot;
         this.prevAssignment = prevAssignment;
         this.discoEvt = discoEvt;
         this.topVer = topVer;
         this.backups = backups;
+        this.holder = holder;
     }
 
     /** {@inheritDoc} */
@@ -81,6 +91,16 @@
         return backups;
     }
 
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T attachment() {
+        return (T)holder.attachment();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void attachment(T attachment) {
+        holder.attachment(attachment);
+    }
+
     /**
      * Gets the previous assignment.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5fdd1bc..8051768 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -343,9 +343,11 @@
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if client-only exchange is needed.
      */
-    public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
+    public boolean onCacheChangeRequest(
+        final GridDhtPartitionsExchangeFuture fut,
         boolean crd,
-        Collection<DynamicCacheChangeRequest> reqs)
+        Collection<DynamicCacheChangeRequest> reqs
+    )
         throws IgniteCheckedException {
         assert !F.isEmpty(reqs) : fut;
 
@@ -426,8 +428,11 @@
 
                             assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
 
-                            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent(), fut.discoCache());
+                            List<List<ClusterNode>> assignment = aff.calculate(
+                                fut.topologyVersion(),
+                                fut.discoveryEvent(),
+                                fut.discoCache(),
+                                fut.attachmentHolder());
 
                             aff.initialize(fut.topologyVersion(), assignment);
 
@@ -795,7 +800,8 @@
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(),
+                fut.discoCache(), fut.attachmentHolder());
 
             cache.affinity().initialize(fut.topologyVersion(), newAff);
         }
@@ -833,7 +839,8 @@
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache(),
+                                fut.attachmentHolder());
 
                         cache.affinity().initialize(fut.topologyVersion(), assignment);
                     }
@@ -861,7 +868,8 @@
         if (!fetch && canCalculateAffinity(aff, fut)) {
             exchLog.info("initAffinity start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']');
 
-            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(),
+                fut.discoCache(), fut.attachmentHolder());
 
             aff.initialize(fut.topologyVersion(), assignment);
 
@@ -933,7 +941,8 @@
 
                             exchLog.info("onServerJoin calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cache.name() + ']');
 
-                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(),
+                                fut.discoCache(), fut.attachmentHolder());
 
                             cache.affinity().initialize(topVer, newAff);
 
@@ -1006,7 +1015,8 @@
 
             if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
                 List<List<ClusterNode>> assignment =
-                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(),
+                        fut.discoCache(), fut.attachmentHolder());
 
                 cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
             }
@@ -1050,7 +1060,8 @@
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache(),
+                fut.attachmentHolder());
 
             affCache.initialize(topVer, aff);
         }
@@ -1062,7 +1073,7 @@
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache(), fut.attachmentHolder());
             }
 
             List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
@@ -1096,7 +1107,8 @@
 
                 exchLog.info("onServerLeft calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']');
 
-                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(),
+                    fut.discoCache(), fut.attachmentHolder());
 
                 exchLog.info("onServerLeft calc aff end [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']');
             }
@@ -1150,7 +1162,8 @@
 
                 if (cache != null) {
                     if (cache.client())
-                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache(),
+                            fut.attachmentHolder());
 
                     return;
                 }
@@ -1202,7 +1215,8 @@
                             throws IgniteCheckedException {
                             fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
-                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache(),
+                                fut.attachmentHolder());
 
                             affFut.onDone(fut.topologyVersion());
                         }
@@ -1328,14 +1342,13 @@
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is not owner.
      * @param affCache Already calculated assignments (to reduce data stored in history).
-     * @throws IgniteCheckedException If failed.
      */
     private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
-        Map<Object, List<List<ClusterNode>>> affCache)
-        throws IgniteCheckedException {
+        Map<Object, List<List<ClusterNode>>> affCache
+    ) {
         exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() +
             ", cache=" + aff.cacheName() + ']');
 
@@ -1352,7 +1365,8 @@
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache(),
+            fut.attachmentHolder());
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
@@ -1360,8 +1374,8 @@
                 List<ClusterNode> newNodes = idealAssignment.get(p);
                 List<ClusterNode> curNodes = curAff.get(p);
 
-                ClusterNode curPrimary = curNodes.size() > 0 ? curNodes.get(0) : null;
-                ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null;
+                ClusterNode curPrimary = !curNodes.isEmpty() ? curNodes.get(0) : null;
+                ClusterNode newPrimary = !newNodes.isEmpty() ? newNodes.get(0) : null;
 
                 if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) {
                     assert cctx.discovery().node(topVer, curPrimary.id()) != null : curPrimary;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 24321b2..0c2499b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -26,6 +26,7 @@
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -76,7 +77,7 @@
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (cctx.isLocal())
             // No discovery event needed for local affinity.
-            aff.calculate(LOC_CACHE_TOP_VER, null, null);
+            aff.calculate(LOC_CACHE_TOP_VER, null, null, new AffinityAttachmentHolder());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0f6a656..fc29a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -349,6 +349,13 @@
     }
 
     /**
+     * @return {@code true} if current thread is exchange worker.
+     */
+    public boolean inExchangeWorkerThread() {
+        return Thread.currentThread() == exchWorker.runner();
+    }
+
+    /**
      * @return Initial exchange ID.
      */
     private GridDhtPartitionExchangeId initialExchangeId() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 110716b..f86c914 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -30,7 +30,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
@@ -50,9 +49,9 @@
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
 import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -86,7 +85,6 @@
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -243,6 +241,9 @@
     @GridToStringExclude
     private volatile IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();
 
+    /** */
+    private AffinityAttachmentHolder affAttachmentHolder;
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -407,6 +408,13 @@
     }
 
     /**
+     * @return Affinity attachment holder to use.
+     */
+    public AffinityAttachmentHolder attachmentHolder() {
+        return cctx.exchange().inExchangeWorkerThread() ? affAttachmentHolder : new AffinityAttachmentHolder();
+    }
+
+    /**
      * @param cacheId Cache ID to check.
      * @param topVer Topology version.
      * @return {@code True} if cache was added during this exchange.
@@ -547,6 +555,8 @@
 
             skipPreload = cctx.kernalContext().clientNode();
 
+            affAttachmentHolder = new AffinityAttachmentHolder();
+
             exchLog.info("Start exchange init [topVer=" + topVer +
                 ", crd=" + crdNode +
                 ", evt=" + discoEvt.type() +
@@ -1294,6 +1304,8 @@
 
             initFut.onDone(err == null);
 
+            affAttachmentHolder.attachment(null);
+
             if (exchId.isLeft()) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts())
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index 8f8d78a..6490b5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -28,6 +28,7 @@
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.testframework.GridTestNode;
@@ -104,7 +105,6 @@
     }
 
     /**
-     * @param backups Number of backups.
      * @throws Exception If failed.
      */
     public void testNullKeyForPartitionCalculation() throws Exception {
@@ -155,7 +155,8 @@
             DiscoveryEvent discoEvt = new DiscoveryEvent(node, "", EventType.EVT_NODE_JOINED, node);
 
             GridAffinityFunctionContextImpl ctx =
-                new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups);
+                new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups,
+                    new AffinityAttachmentHolder());
 
             List<List<ClusterNode>> assignment = aff.assignPartitions(ctx);
 
@@ -181,7 +182,7 @@
 
             List<List<ClusterNode>> assignment = aff.assignPartitions(
                 new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
-                    backups));
+                    backups, new AffinityAttachmentHolder()));
 
             info("Assigned.");
 
@@ -254,7 +255,7 @@
 
             List<List<ClusterNode>> assignment = aff.assignPartitions(
                 new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
-                    backups));
+                    backups, new AffinityAttachmentHolder()));
 
             verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java
index 25a0be3..c0d2399 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java
@@ -32,6 +32,7 @@
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.util.typedef.F;
@@ -108,7 +109,7 @@
     public void testPrimaryPartitionsOneNode() throws Exception {
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx);
 
@@ -151,7 +152,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx);
 
@@ -182,7 +183,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx);
 
@@ -216,7 +217,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx);
 
@@ -239,7 +240,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         AffinityFunction aff = affinity();
 
@@ -258,7 +259,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         AffinityFunction aff = affinity();
 
@@ -278,7 +279,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         AffinityFunction aff = affinity();
 
@@ -303,7 +304,7 @@
 
         AffinityFunctionContext ctx =
             new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null,
-                new AffinityTopologyVersion(1), 1);
+                new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder());
 
         AffinityFunction aff = affinity();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index 2784295..1af480a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -20,10 +20,14 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -43,7 +47,7 @@
 
         for (int i = 5; i < NODES_CNT; i = i * 3 / 2) {
             for (int replicas = 128; replicas <= 4096; replicas*=2) {
-                Collection<ClusterNode> nodes = createNodes(i, replicas);
+                List<ClusterNode> nodes = createNodes(i, replicas);
 
                 RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 10000);
 
@@ -59,8 +63,8 @@
      * @param replicas Value of
      * @return Collection of test nodes.
      */
-    private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) {
-        Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+    private List<ClusterNode> createNodes(int nodesCnt, int replicas) {
+        List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
 
         for (int i = 0; i < nodesCnt; i++)
             nodes.add(new TestRichNode(replicas));
@@ -72,14 +76,22 @@
      * @param aff Affinity to check.
      * @param nodes Collection of nodes to test on.
      */
-    private void checkDistribution(RendezvousAffinityFunction aff, Collection<ClusterNode> nodes) {
+    private void checkDistribution(RendezvousAffinityFunction aff, List<ClusterNode> nodes) {
         Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
 
+        GridAffinityFunctionContextImpl ctx = new GridAffinityFunctionContextImpl(
+            nodes,
+            null,
+            null,
+            new AffinityTopologyVersion(1, 0),
+            0,
+            new AffinityAttachmentHolder()
+        );
+
+        List<List<ClusterNode>> affDist = aff.assignPartitions(ctx);
+
         for (int part = 0; part < aff.getPartitions(); part++) {
-            Collection<ClusterNode> affNodes = aff.assignPartition(part,
-                new ArrayList<ClusterNode>(nodes),
-                0,
-                new HashMap<UUID, Collection<ClusterNode>>());
+            Collection<ClusterNode> affNodes = affDist.get(part);
 
             assertEquals(1, affNodes.size());
 
@@ -134,7 +146,10 @@
          */
         @SuppressWarnings("UnusedDeclaration")
         private TestRichNode(int replicas) {
-            this(UUID.randomUUID(), replicas);
+            super(UUID.randomUUID());
+
+            nodeId = id();
+            this.replicas = replicas;
         }
 
         /**
@@ -143,6 +158,8 @@
          * @param nodeId Node id.
          */
         private TestRichNode(UUID nodeId, int replicas) {
+            super(nodeId);
+
             this.nodeId = nodeId;
             this.replicas = replicas;
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index db11291..1040a86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -34,6 +34,7 @@
 import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest;
@@ -215,7 +216,7 @@
     private Collection<ClusterNode> nodes(AffinityFunction aff, int part, Collection<ClusterNode> nodes) {
         List<List<ClusterNode>> assignment = aff.assignPartitions(
             new GridAffinityFunctionContextImpl(new ArrayList<>(nodes), null, null, new AffinityTopologyVersion(1),
-                BACKUP_CNT));
+                BACKUP_CNT, new AffinityAttachmentHolder()));
 
         return assignment.get(part);
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 54328c6..441101a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -57,6 +57,7 @@
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -245,7 +246,8 @@
             null,
             null,
             topVer(1, 0),
-            cctx.config().getBackups());
+            cctx.config().getBackups(),
+            new AffinityAttachmentHolder());
 
         List<List<ClusterNode>> calcAff1_0 = func.assignPartitions(ctx);
 
@@ -256,7 +258,8 @@
             calcAff1_0,
             null,
             topVer(1, 0),
-            cctx.config().getBackups());
+            cctx.config().getBackups(),
+            new AffinityAttachmentHolder());
 
         List<List<ClusterNode>> calcAff2_0 = func.assignPartitions(ctx);
 
@@ -2457,7 +2460,8 @@
                 previousAssignment(topVer, cacheDesc.cacheId()),
                 evt,
                 topVer0,
-                cacheDesc.cacheConfiguration().getBackups());
+                cacheDesc.cacheConfiguration().getBackups(),
+                new AffinityAttachmentHolder());
 
             List<List<ClusterNode>> assignment = func.assignPartitions(affCtx);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 9c39ad7..d9ad163 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -56,6 +56,7 @@
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -721,7 +722,8 @@
             null,
             discoEvt,
             new AffinityTopologyVersion(topVer + 1),
-            1);
+            1,
+            new AffinityAttachmentHolder());
 
         AffinityFunction affFunc = ignite.cache(null).getConfiguration(CacheConfiguration.class).getAffinity();