IGNITE-5232 GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently, which lead to significant increase of node start time on large clusters with ssl

(cherry picked from commit 4c460b78f7b0febc37940c8d65f91cb449fa4d54)

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index a6808c7..daae1e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -331,41 +331,21 @@
 
             return new Runnable() {
                 @Override public void run() {
-                    try {
-                        if (next != null)
-                            fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                                @Override public void apply(IgniteInternalFuture<Boolean> f) {
-                                    try {
-                                        if (f.get()) // Not cancelled.
-                                            next.run(); // Starts next cache rebalancing (according to the order).
-                                    }
-                                    catch (IgniteCheckedException ignored) {
-                                        if (log.isDebugEnabled())
-                                            log.debug(ignored.getMessage());
-                                    }
+                    if (next != null)
+                        fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                            @Override public void apply(IgniteInternalFuture<Boolean> f) {
+                                try {
+                                    if (f.get()) // Not cancelled.
+                                        next.run(); // Starts next cache rebalancing (according to the order).
                                 }
-                            });
+                                catch (IgniteCheckedException ignored) {
+                                    if (log.isDebugEnabled())
+                                        log.debug(ignored.getMessage());
+                                }
+                            }
+                        });
 
-                        requestPartitions(fut, assigns);
-                    }
-                    catch (IgniteCheckedException e) {
-                        ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
-
-                        if (cause != null)
-                            log.warning("Failed to send initial demand request to node. " + e.getMessage());
-                        else
-                            log.error("Failed to send initial demand request to node.", e);
-
-                        fut.cancel();
-                    }
-                    catch (Throwable th) {
-                        log.error("Runtime error caught during initial demand request sending.", th);
-
-                        fut.cancel();
-
-                        if (th instanceof Error)
-                            throw th;
-                    }
+                    requestPartitions(fut, assigns);
                 }
             };
         }
@@ -404,9 +384,8 @@
      * @return Partitions were requested.
      */
     private void requestPartitions(
-        RebalanceFuture fut,
-        GridDhtPreloaderAssignments assigns
-    ) throws IgniteCheckedException {
+        final RebalanceFuture fut,
+        GridDhtPreloaderAssignments assigns){
         if (topologyChanged(fut)) {
             fut.cancel();
 
@@ -438,7 +417,7 @@
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
-                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+            final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
                 for (int cnt = 0; cnt < lsnrCnt; cnt++)
                     sParts.add(new HashSet<Integer>());
@@ -453,42 +432,74 @@
                 for (cnt = 0; cnt < lsnrCnt; cnt++) {
                     if (!sParts.get(cnt).isEmpty()) {
                         // Create copy.
-                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+                        final GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
                         initD.topic(rebalanceTopics.get(cnt));
                         initD.updateSequence(fut.updateSeq);
                         initD.timeout(cctx.config().getRebalanceTimeout());
 
-                        synchronized (fut) {
-                            if (!fut.isDone()) {
-                                // Future can be already cancelled at this moment and all failovers happened.
-                                // New requests will not be covered by failovers.
-                                cctx.io().sendOrderedMessage(node,
-                                    rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
-                            }
-                        }
+                        final int finalCnt = cnt;
 
-                        if (log.isDebugEnabled())
-                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
-                                cnt + ", partitions count=" + sParts.get(cnt).size() +
-                                " (" + partitionsList(sParts.get(cnt)) + ")]");
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    if (!fut.isDone()) {
+                                        cctx.io().sendOrderedMessage(node,
+                                            rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout());
+
+                                        // Cleanup required in case partitions demanded in parallel with cancellation.
+                                        synchronized (fut) {
+                                            if (fut.isDone())
+                                                fut.cleanupRemoteContexts(node.id());
+                                        }
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+                                                finalCnt + ", partitions count=" + sParts.get(finalCnt).size() +
+                                                " (" + partitionsList(sParts.get(finalCnt)) + ")]");
+                                    }
+                                }
+                                catch (IgniteCheckedException e) {
+                                    ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
+
+                                    if (cause != null)
+                                        log.warning("Failed to send initial demand request to node. " + e.getMessage());
+                                    else
+                                        log.error("Failed to send initial demand request to node.", e);
+
+                                    fut.cancel();
+                                }
+                                catch (Throwable th) {
+                                    log.error("Runtime error caught during initial demand request sending.", th);
+
+                                    fut.cancel();
+                                }
+                            }
+                        }, /*system pool*/true);
                     }
                 }
             }
             else {
-                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
-                    ", mode=" + cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() +
-                    ", partitionsCount=" + parts.size() +
-                    ", topology=" + fut.topologyVersion() +
-                    ", updateSeq=" + fut.updateSeq + "]");
+                try {
+                    U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
+                        ", mode=" + cfg.getRebalanceMode() +
+                        ", fromNode=" + node.id() +
+                        ", partitionsCount=" + parts.size() +
+                        ", topology=" + fut.topologyVersion() +
+                        ", updateSeq=" + fut.updateSeq + "]");
 
-                d.timeout(cctx.config().getRebalanceTimeout());
-                d.workerId(0);//old api support.
+                    d.timeout(cctx.config().getRebalanceTimeout());
+                    d.workerId(0);//old api support.
 
-                worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
+                    worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
 
-                worker.run(node, d);
+                    worker.run(node, d);
+                }
+                catch (Throwable th) {
+                    log.error("Runtime error caught during initial demand request sending.", th);
+
+                    fut.cancel();
+                }
             }
         }
     }