IGNITE-17502 Tasks to sent the snapshot files are not ordered (#10189)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 804bad2..2d1981a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -35,7 +35,6 @@
 import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -2628,7 +2627,7 @@
         private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
 
         /** {@code true} if the node is stopping. */
-        private volatile boolean stopping;
+        private boolean stopping;
 
         /**
          * @param next New task for scheduling.
@@ -2639,14 +2638,6 @@
             if (stopping) {
                 next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
 
-                if (active != null)
-                    active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
-
-                RemoteSnapshotFilesRecevier r;
-
-                while ((r = queue.poll()) != null)
-                    r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
-
                 return;
             }
 
@@ -2674,9 +2665,17 @@
         }
 
         /** Stopping handler. */
-        public void stop() {
+        public synchronized void stop() {
             stopping = true;
 
+            if (active != null)
+                active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+            RemoteSnapshotFilesRecevier r;
+
+            while ((r = queue.poll()) != null)
+                r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
             Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
             GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
 
@@ -2709,7 +2708,6 @@
          * @return The set of currently scheduled tasks, some of them may be already completed.
          */
         private Set<RemoteSnapshotFilesRecevier> activeTasks() {
-
             Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
 
             RemoteSnapshotFilesRecevier active0 = active;
@@ -2913,12 +2911,10 @@
                             ", grpId=" + grpId + ", partId=" + partId + ']');
                     }
 
-                    busyLock.enterBusy();
+                    if (!busyLock.enterBusy())
+                        throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
 
                     try {
-                        if (stopping)
-                            throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
-
                         task0.acceptFile(file);
                     }
                     finally {
@@ -2930,70 +2926,6 @@
     }
 
     /**
-     * Such an executor can executes tasks not in a single thread, but executes them
-     * on different threads sequentially. It's important for some {@link SnapshotSender}'s
-     * to process sub-task sequentially due to all these sub-tasks may share a single socket
-     * channel to send data to.
-     */
-    private static class SequentialExecutorWrapper implements Executor {
-        /** Ignite logger. */
-        private final IgniteLogger log;
-
-        /** Queue of task to execute. */
-        private final Queue<Runnable> tasks = new ArrayDeque<>();
-
-        /** Delegate executor. */
-        private final Executor executor;
-
-        /** Currently running task. */
-        private volatile Runnable active;
-
-        /** If wrapped executor is shutting down. */
-        private volatile boolean stopping;
-
-        /**
-         * @param executor Executor to run tasks on.
-         */
-        public SequentialExecutorWrapper(IgniteLogger log, Executor executor) {
-            this.log = log.getLogger(SequentialExecutorWrapper.class);
-            this.executor = executor;
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void execute(final Runnable r) {
-            assert !stopping : "Task must be cancelled prior to the wrapped executor is shutting down.";
-
-            tasks.offer(() -> {
-                try {
-                    r.run();
-                }
-                finally {
-                    scheduleNext();
-                }
-            });
-
-            if (active == null)
-                scheduleNext();
-        }
-
-        /** */
-        private synchronized void scheduleNext() {
-            if ((active = tasks.poll()) != null) {
-                try {
-                    executor.execute(active);
-                }
-                catch (RejectedExecutionException e) {
-                    tasks.clear();
-
-                    stopping = true;
-
-                    log.warning("Task is outdated. Wrapped executor is shutting down.", e);
-                }
-            }
-        }
-    }
-
-    /**
      *
      */
     private static class RemoteSnapshotSender extends SnapshotSender {
@@ -3017,7 +2949,7 @@
             GridIoManager.TransmissionSender sndr,
             String rqId
         ) {
-            super(log, new SequentialExecutorWrapper(log, exec));
+            super(log, exec);
 
             this.sndr = sndr;
             this.rqId = rqId;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
index 81c9870..a06ec86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
@@ -26,6 +26,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -80,7 +81,7 @@
         this.snpPath = snpPath;
         this.parts = new HashMap<>();
 
-        for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+        for (Map.Entry<Integer, Set<Integer>> e : F.view(parts.entrySet(), e -> !F.isEmpty(e.getValue())))
             this.parts.put(e.getKey(), U.toIntArray(e.getValue()));
     }
 
@@ -90,11 +91,8 @@
     public Map<Integer, Set<Integer>> parts() {
         Map<Integer, Set<Integer>> res = new HashMap<>();
 
-        for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
-            res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
-                .boxed()
-                .collect(Collectors.toSet()));
-        }
+        for (Map.Entry<Integer, int[]> e : parts.entrySet())
+            res.put(e.getKey(), Arrays.stream(e.getValue()).boxed().collect(Collectors.toSet()));
 
         return res;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
index 6ccf200..d18a6e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -18,13 +18,14 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -32,7 +33,6 @@
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
-import static java.util.Optional.ofNullable;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectory;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
@@ -50,6 +50,7 @@
      * @param tmpWorkDir Working directory for intermediate snapshot results.
      * @param ioFactory Factory to working with snapshot files.
      * @param snpSndr Factory which produces snapshot receiver instance.
+     * @param parts Partition to be processed.
      */
     public SnapshotResponseRemoteFutureTask(
         GridCacheSharedContext<?, ?> cctx,
@@ -72,71 +73,67 @@
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
+
+                    if (F.isEmpty(parts0))
+                        continue;
+
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
+
+                    if (locParts != null && locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
+
+                return null;
+            };
+
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
+
+            parts.forEach((grpId, parts) -> parts.forEach(
+                part -> partsToSend.computeIfAbsent(new GroupPartitionId(grpId, part), findMeta)));
+
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
+
+                throw new IgniteException("Snapshot partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']');
             }
 
-            snpSndr.init(handled.size());
+            snpSndr.init(partsToSend.size());
 
             File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+            CompletableFuture.runAsync(() -> partsToSend.forEach((gp, meta) -> {
+                if (err.get() != null)
+                    return;
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
+                    gp.getGroupId());
 
-                if (F.isEmpty(parts0))
-                    continue;
+                if (cacheDir == null) {
+                    throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
+                        ", pair=" + gp + ']');
+                }
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+                File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
 
-                            File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+                if (!snpPart.exists()) {
+                    throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
+                        ", pair=" + gp + ']');
+                }
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
-
-                            File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
-
-                            if (!snpPart.exists()) {
-                                throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
-                                    ", pair=" + gp + ']');
-                            }
-
-                            snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
-                        }, snpSndr.executor())
-                            .whenComplete((r, t) -> err.compareAndSet(null, t)));
-
-                        return true;
-                    }
-
-                    return false;
-                });
-            }
-
-            if (!handled.isEmpty()) {
-                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node [snpName=" + snpName +
-                    ", missed=" + handled + ']'));
-            }
-
-            int size = futs.size();
-
-            CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+                snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
+            }), snpSndr.executor())
                 .whenComplete((r, t) -> {
-                    Throwable th = ofNullable(err.get()).orElse(t);
+                    if (t != null)
+                        err.compareAndSet(null, t);
+
+                    Throwable th = err.get();
 
                     if (th == null && log.isInfoEnabled()) {
                         log.info("Snapshot partitions have been sent to the remote node [snpName=" + snpName +
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index d61630b..59525a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -18,12 +18,16 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,6 +36,7 @@
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -49,6 +54,7 @@
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_THREAD_POOL_SIZE;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.groupIdFromTmpDir;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -56,6 +62,18 @@
 
 /** */
 public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** */
+    private int snapshotThreadPoolSize = DFLT_SNAPSHOT_THREAD_POOL_SIZE;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setSnapshotThreadPoolSize(snapshotThreadPoolSize);
+
+        return cfg;
+    }
+
     /** @throws Exception If fails. */
     @Test
     public void testSnapshotRemoteRequestFromSingleNode() throws Exception {
@@ -300,6 +318,53 @@
             "Future cancelled prior to the all requested partitions processed");
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testSendPartitonsSequentially() throws Exception {
+        snapshotThreadPoolSize = 1;
+
+        IgniteEx sndr = startGridsWithCache(3, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        UUID sndNode = sndr.localNode().id();
+
+        sndr.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        IgniteSnapshotManager mgr0 = snp(grid(0));
+
+        List<UUID> nodes = new CopyOnWriteArrayList<>();
+
+        mgr0.remoteSnapshotSenderFactory((rqId, nodeId) -> new DelegateSnapshotSender(log,
+            snp(sndr).snapshotExecutorService(), mgr0.remoteSnapshotSenderFactory(rqId, nodeId)) {
+            @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                nodes.add(nodeId);
+
+                // Single thread must send partitions sequentially node by node.
+                checkDuplicates(nodes);
+
+                super.sendPart0(part, cacheDirName, pair, length);
+            }
+        });
+
+        Collection<IgniteEx> rcvrs = F.viewReadOnly(G.allGrids(), srv -> (IgniteEx)srv,
+            srv -> !F.eq(sndr.localNode(), srv.cluster().localNode()));
+
+        GridCompoundFuture<Void, Void> futs = new GridCompoundFuture<>();
+
+        for (IgniteEx rcv : rcvrs) {
+            Map<Integer, Set<Integer>> expParts = owningParts(rcv, CU.cacheId(DEFAULT_CACHE_NAME), sndNode);
+
+            IgniteInternalFuture<Void> fut = snp(rcv)
+                .requestRemoteSnapshotFiles(sndNode, SNAPSHOT_NAME, null, expParts, () -> false,
+                    defaultPartitionConsumer(expParts, null));
+
+            fut.listen(f -> expParts.values().forEach(integers -> assertTrue(integers.isEmpty())));
+
+            futs.add(fut);
+        }
+
+        futs.markInitialized().get(getTestTimeout());
+    }
+
     /**
      * @param parts Expected partitions.
      * @param latch Latch to await partitions processed.
@@ -315,7 +380,8 @@
             assertTrue("Received partition has not been requested",
                 parts.get(grpId).remove(partId(part.getName())));
 
-            latch.countDown();
+            if (latch != null)
+                latch.countDown();
         };
     }
 
@@ -337,4 +403,25 @@
             .map(Map.Entry::getKey)
             .collect(Collectors.toSet()));
     }
+
+    /**
+     * Checks that the list can contain only neighboring duplicates:
+     * 1, 1, 1, 3, 2, 2 -> ok
+     * 1, 1, 3, 1, 2, 2 -> fail
+     *
+     * @param list List to check.
+     */
+    private <T> void checkDuplicates(List<T> list) {
+        LinkedList<T> grouped = new LinkedList<>();
+
+        for (T item : list) {
+            if (!F.eq(grouped.peekLast(), item))
+                grouped.add(item);
+        }
+
+        if (list.stream().distinct().collect(Collectors.toList()).equals(grouped))
+            return;
+
+        fail("List contains non neighboring duplicates: " + list);
+    }
 }