Merge remote-tracking branch 'remotes/origin/master' into ignite-8783
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 7e579cb..8a57b90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -16,11 +16,11 @@
  */
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,8 +45,6 @@
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
@@ -80,15 +78,15 @@
     private volatile ClusterNode crd;
 
     /** Pending acks collection. */
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
 
     /** Server latches collection. */
     @GridToStringInclude
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, ServerLatch> serverLatches = new ConcurrentHashMap<>();
 
     /** Client latches collection. */
     @GridToStringInclude
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, ClientLatch> clientLatches = new ConcurrentHashMap<>();
 
     /** Lock. */
     private final ReentrantLock lock = new ReentrantLock();
@@ -130,37 +128,30 @@
      * Creates server latch with given {@code id} and {@code topVer}.
      * Adds corresponding pending acks to it.
      *
-     * @param id Latch id.
-     * @param topVer Latch topology version.
+     * @param latchUid Latch uid.
      * @param participants Participant nodes.
      * @return Server latch instance.
      */
-    private Latch createServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
-        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+    private Latch createServerLatch(CompletableLatchUid latchUid, Collection<ClusterNode> participants) {
+        assert !serverLatches.containsKey(latchUid);
 
-        if (serverLatches.containsKey(latchId))
-            return serverLatches.get(latchId);
+        ServerLatch latch = new ServerLatch(latchUid, participants);
 
-        ServerLatch latch = new ServerLatch(id, topVer, participants);
-
-        serverLatches.put(latchId, latch);
+        serverLatches.put(latchUid, latch);
 
         if (log.isDebugEnabled())
-            log.debug("Server latch is created [latch=" + latchId + ", participantsSize=" + participants.size() + "]");
+            log.debug("Server latch is created [latch=" + latchUid + ", participantsSize=" + participants.size() + "]");
 
-        if (pendingAcks.containsKey(latchId)) {
-            Set<UUID> acks = pendingAcks.get(latchId);
+        if (pendingAcks.containsKey(latchUid)) {
+            Set<UUID> acks = pendingAcks.get(latchUid);
 
             for (UUID node : acks)
                 if (latch.hasParticipant(node) && !latch.hasAck(node))
                     latch.ack(node);
 
-            pendingAcks.remove(latchId);
+            pendingAcks.remove(latchUid);
         }
 
-        if (latch.isCompleted())
-            serverLatches.remove(latchId);
-
         return latch;
     }
 
@@ -168,32 +159,23 @@
      * Creates client latch.
      * If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be completed immediately.
      *
-     * @param id Latch id.
-     * @param topVer Latch topology version.
+     * @param latchUid Latch uid.
      * @param coordinator Coordinator node.
      * @param participants Participant nodes.
      * @return Client latch instance.
      */
-    private Latch createClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
-        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+    private Latch createClientLatch(CompletableLatchUid latchUid, ClusterNode coordinator, Collection<ClusterNode> participants) {
+        assert !serverLatches.containsKey(latchUid);
+        assert !clientLatches.containsKey(latchUid);
 
-        if (clientLatches.containsKey(latchId))
-            return clientLatches.get(latchId);
-
-        ClientLatch latch = new ClientLatch(id, topVer, coordinator, participants);
+        ClientLatch latch = new ClientLatch(latchUid, coordinator, participants);
 
         if (log.isDebugEnabled())
-            log.debug("Client latch is created [latch=" + latchId
-                    + ", crd=" + coordinator
-                    + ", participantsSize=" + participants.size() + "]");
+            log.debug("Client latch is created [latch=" + latchUid
+                + ", crd=" + coordinator
+                + ", participantsSize=" + participants.size() + "]");
 
-        // There is final ack for created latch.
-        if (pendingAcks.containsKey(latchId)) {
-            latch.complete();
-            pendingAcks.remove(latchId);
-        }
-        else
-            clientLatches.put(latchId, latch);
+        clientLatches.put(latchUid, latch);
 
         return latch;
     }
@@ -212,20 +194,24 @@
         lock.lock();
 
         try {
+            final CompletableLatchUid latchUid = new CompletableLatchUid(id, topVer);
+
+            CompletableLatch latch = clientLatches.containsKey(latchUid) ?
+                clientLatches.get(latchUid) : serverLatches.get(latchUid);
+
+            if (latch != null)
+                return latch;
+
             ClusterNode coordinator = getLatchCoordinator(topVer);
 
-            if (coordinator == null) {
-                ClientLatch latch = new ClientLatch(id, AffinityTopologyVersion.NONE, null, Collections.emptyList());
-                latch.complete();
-
-                return latch;
-            }
+            if (coordinator == null)
+                return null;
 
             Collection<ClusterNode> participants = getLatchParticipants(topVer);
 
             return coordinator.isLocal()
-                ? createServerLatch(id, topVer, participants)
-                : createClientLatch(id, topVer, coordinator, participants);
+                ? createServerLatch(latchUid, participants)
+                : createClientLatch(latchUid, coordinator, participants);
         }
         finally {
             lock.unlock();
@@ -275,10 +261,11 @@
         Collection<ClusterNode> aliveNodes = aliveNodesForTopologyVer(topVer);
 
         return aliveNodes
-                .stream()
-                .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
-                .findFirst()
-                .orElse(null);
+            .stream()
+            .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+            .sorted(Comparator.comparing(ClusterNode::order))
+            .findFirst()
+            .orElse(null);
     }
 
     /**
@@ -300,39 +287,36 @@
             if (coordinator == null)
                 return;
 
-            T2<String, AffinityTopologyVersion> latchId = new T2<>(message.latchId(), message.topVer());
+            CompletableLatchUid latchUid = new CompletableLatchUid(message.latchId(), message.topVer());
 
             if (message.isFinal()) {
                 if (log.isDebugEnabled())
-                    log.debug("Process final ack [latch=" + latchId + ", from=" + from + "]");
+                    log.debug("Process final ack [latch=" + latchUid + ", from=" + from + "]");
 
-                if (clientLatches.containsKey(latchId)) {
-                    ClientLatch latch = clientLatches.remove(latchId);
+                assert serverLatches.containsKey(latchUid) || clientLatches.containsKey(latchUid);
+
+                if (clientLatches.containsKey(latchUid)) {
+                    ClientLatch latch = clientLatches.remove(latchUid);
+
                     latch.complete();
                 }
-                else if (!coordinator.isLocal()) {
-                    pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
-                    pendingAcks.get(latchId).add(from);
-                }
-                else if (coordinator.isLocal())
-                    serverLatches.remove(latchId);
-            } else {
+
+                serverLatches.remove(latchUid);
+            }
+            else {
                 if (log.isDebugEnabled())
-                    log.debug("Process ack [latch=" + latchId + ", from=" + from + "]");
+                    log.debug("Process ack [latch=" + latchUid + ", from=" + from + "]");
 
-                if (serverLatches.containsKey(latchId)) {
-                    ServerLatch latch = serverLatches.get(latchId);
+                if (serverLatches.containsKey(latchUid)) {
+                    ServerLatch latch = serverLatches.get(latchUid);
 
-                    if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+                    if (latch.hasParticipant(from) && !latch.hasAck(from))
                         latch.ack(from);
-
-                        if (latch.isCompleted())
-                            serverLatches.remove(latchId);
-                    }
                 }
                 else {
-                    pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
-                    pendingAcks.get(latchId).add(from);
+                    pendingAcks.computeIfAbsent(latchUid, (id) -> new GridConcurrentHashSet<>());
+
+                    pendingAcks.get(latchUid).add(from);
                 }
             }
         }
@@ -349,17 +333,18 @@
         if (log.isInfoEnabled())
             log.info("Become new coordinator " + crd.id());
 
-        List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>();
+        Set<CompletableLatchUid> latchesToRestore = new HashSet<>();
+
         latchesToRestore.addAll(pendingAcks.keySet());
         latchesToRestore.addAll(clientLatches.keySet());
 
-        for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
-            String id = latchId.get1();
-            AffinityTopologyVersion topVer = latchId.get2();
+        for (CompletableLatchUid latchUid : latchesToRestore) {
+            String id = latchUid.id;
+            AffinityTopologyVersion topVer = latchUid.topVer;
             Collection<ClusterNode> participants = getLatchParticipants(topVer);
 
             if (!participants.isEmpty())
-                createServerLatch(id, topVer, participants);
+                createServerLatch(latchUid, participants);
         }
     }
 
@@ -389,12 +374,12 @@
                 return;
 
             // Clear pending acks.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> ackEntry : pendingAcks.entrySet())
+            for (Map.Entry<CompletableLatchUid, Set<UUID>> ackEntry : pendingAcks.entrySet())
                 if (ackEntry.getValue().contains(left.id()))
                     pendingAcks.get(ackEntry.getKey()).remove(left.id());
 
             // Change coordinator for client latches.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> latchEntry : clientLatches.entrySet()) {
+            for (Map.Entry<CompletableLatchUid, ClientLatch> latchEntry : clientLatches.entrySet()) {
                 ClientLatch latch = latchEntry.getValue();
                 if (latch.hasCoordinator(left.id())) {
                     // Change coordinator for latch and re-send ack if necessary.
@@ -404,7 +389,7 @@
                         /* If new coordinator is not able to take control on the latch,
                            it means that all other latch participants are left from topology
                            and there is no reason to track such latch. */
-                        AffinityTopologyVersion topVer = latchEntry.getKey().get2();
+                        AffinityTopologyVersion topVer = latchEntry.getKey().topVer;
 
                         assert getLatchParticipants(topVer).isEmpty();
 
@@ -415,7 +400,7 @@
             }
 
             // Add acknowledgements from left node.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> latchEntry : serverLatches.entrySet()) {
+            for (Map.Entry<CompletableLatchUid, ServerLatch> latchEntry : serverLatches.entrySet()) {
                 ServerLatch latch = latchEntry.getValue();
 
                 if (latch.hasParticipant(left.id()) && !latch.hasAck(left.id())) {
@@ -423,9 +408,6 @@
                         log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]");
 
                     latch.ack(left.id());
-
-                    if (latch.isCompleted())
-                        serverLatches.remove(latchEntry.getKey());
                 }
             }
 
@@ -458,12 +440,11 @@
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param participants Participant nodes.
          */
-        ServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
-            super(id, topVer, participants);
+        ServerLatch(CompletableLatchUid latchUid, Collection<ClusterNode> participants) {
+            super(latchUid, participants);
             this.permits = new AtomicInteger(participants.size());
 
             // Send final acks when latch is completed.
@@ -558,13 +539,12 @@
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param coordinator Coordinator node.
          * @param participants Participant nodes.
          */
-        ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
-            super(id, topVer, participants);
+        ClientLatch(CompletableLatchUid latchUid, ClusterNode coordinator, Collection<ClusterNode> participants) {
+            super(latchUid, participants);
 
             this.coordinator = coordinator;
         }
@@ -658,13 +638,12 @@
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param participants Participant nodes.
          */
-        CompletableLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
-            this.id = id;
-            this.topVer = topVer;
+        CompletableLatch(CompletableLatchUid latchUid, Collection<ClusterNode> participants) {
+            this.id = latchUid.id;
+            this.topVer = latchUid.topVer;
             this.participants = participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
         }
 
@@ -724,6 +703,47 @@
         }
     }
 
+    /**
+     * Latch id + topology
+     */
+    private static class CompletableLatchUid {
+        /** Id. */
+        private String id;
+
+        /** Topology version. */
+        private AffinityTopologyVersion topVer;
+
+        /**
+         * @param id Id.
+         * @param topVer Topology version.
+         */
+        private CompletableLatchUid(String id, AffinityTopologyVersion topVer) {
+            this.id = id;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            CompletableLatchUid uid = (CompletableLatchUid)o;
+            return Objects.equals(id, uid.id) &&
+                Objects.equals(topVer, uid.topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(id, topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CompletableLatchUid{" + "id='" + id + '\'' + ", topVer=" + topVer + '}';
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ExchangeLatchManager.class, this);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
index 52cd033..3bff341 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -16,16 +16,18 @@
  */
 package org.apache.ignite.internal.processors.cache.datastructures;
 
+import com.google.common.collect.Lists;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.collect.Lists;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -240,5 +242,18 @@
         finishAllLatches.get(5000);
 
         Assert.assertFalse("All nodes should complete latches without errors", hasErrors.get());
+
+        awaitPartitionMapExchange();
+
+        for (int node = 1; node < 5; node++) {
+            IgniteEx grid = grid(node);
+            ExchangeLatchManager latchMgr = grid.context().cache().context().exchange().latch();
+
+            Map srvLatches = U.field(latchMgr, "serverLatches");
+            Map cliLatches = U.field(latchMgr, "clientLatches");
+
+            assertTrue(srvLatches.keySet().toString(), srvLatches.isEmpty());
+            assertTrue(cliLatches.keySet().toString(), cliLatches.isEmpty());
+        }
     }
 }