IGNITE-17502 Tasks to sent the snapshot files are not ordered (#10189)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 804bad2..2d1981a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -35,7 +35,6 @@
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -2628,7 +2627,7 @@
private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
/** {@code true} if the node is stopping. */
- private volatile boolean stopping;
+ private boolean stopping;
/**
* @param next New task for scheduling.
@@ -2639,14 +2638,6 @@
if (stopping) {
next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
- if (active != null)
- active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
-
- RemoteSnapshotFilesRecevier r;
-
- while ((r = queue.poll()) != null)
- r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
-
return;
}
@@ -2674,9 +2665,17 @@
}
/** Stopping handler. */
- public void stop() {
+ public synchronized void stop() {
stopping = true;
+ if (active != null)
+ active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+ RemoteSnapshotFilesRecevier r;
+
+ while ((r = queue.poll()) != null)
+ r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
@@ -2709,7 +2708,6 @@
* @return The set of currently scheduled tasks, some of them may be already completed.
*/
private Set<RemoteSnapshotFilesRecevier> activeTasks() {
-
Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
RemoteSnapshotFilesRecevier active0 = active;
@@ -2913,12 +2911,10 @@
", grpId=" + grpId + ", partId=" + partId + ']');
}
- busyLock.enterBusy();
+ if (!busyLock.enterBusy())
+ throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
try {
- if (stopping)
- throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
-
task0.acceptFile(file);
}
finally {
@@ -2930,70 +2926,6 @@
}
/**
- * Such an executor can executes tasks not in a single thread, but executes them
- * on different threads sequentially. It's important for some {@link SnapshotSender}'s
- * to process sub-task sequentially due to all these sub-tasks may share a single socket
- * channel to send data to.
- */
- private static class SequentialExecutorWrapper implements Executor {
- /** Ignite logger. */
- private final IgniteLogger log;
-
- /** Queue of task to execute. */
- private final Queue<Runnable> tasks = new ArrayDeque<>();
-
- /** Delegate executor. */
- private final Executor executor;
-
- /** Currently running task. */
- private volatile Runnable active;
-
- /** If wrapped executor is shutting down. */
- private volatile boolean stopping;
-
- /**
- * @param executor Executor to run tasks on.
- */
- public SequentialExecutorWrapper(IgniteLogger log, Executor executor) {
- this.log = log.getLogger(SequentialExecutorWrapper.class);
- this.executor = executor;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void execute(final Runnable r) {
- assert !stopping : "Task must be cancelled prior to the wrapped executor is shutting down.";
-
- tasks.offer(() -> {
- try {
- r.run();
- }
- finally {
- scheduleNext();
- }
- });
-
- if (active == null)
- scheduleNext();
- }
-
- /** */
- private synchronized void scheduleNext() {
- if ((active = tasks.poll()) != null) {
- try {
- executor.execute(active);
- }
- catch (RejectedExecutionException e) {
- tasks.clear();
-
- stopping = true;
-
- log.warning("Task is outdated. Wrapped executor is shutting down.", e);
- }
- }
- }
- }
-
- /**
*
*/
private static class RemoteSnapshotSender extends SnapshotSender {
@@ -3017,7 +2949,7 @@
GridIoManager.TransmissionSender sndr,
String rqId
) {
- super(log, new SequentialExecutorWrapper(log, exec));
+ super(log, exec);
this.sndr = sndr;
this.rqId = rqId;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
index 81c9870..a06ec86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -80,7 +81,7 @@
this.snpPath = snpPath;
this.parts = new HashMap<>();
- for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+ for (Map.Entry<Integer, Set<Integer>> e : F.view(parts.entrySet(), e -> !F.isEmpty(e.getValue())))
this.parts.put(e.getKey(), U.toIntArray(e.getValue()));
}
@@ -90,11 +91,8 @@
public Map<Integer, Set<Integer>> parts() {
Map<Integer, Set<Integer>> res = new HashMap<>();
- for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
- res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
- .boxed()
- .collect(Collectors.toSet()));
- }
+ for (Map.Entry<Integer, int[]> e : parts.entrySet())
+ res.put(e.getKey(), Arrays.stream(e.getValue()).boxed().collect(Collectors.toSet()));
return res;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
index 6ccf200..d18a6e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -18,13 +18,14 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -32,7 +33,6 @@
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
-import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectory;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
@@ -50,6 +50,7 @@
* @param tmpWorkDir Working directory for intermediate snapshot results.
* @param ioFactory Factory to working with snapshot files.
* @param snpSndr Factory which produces snapshot receiver instance.
+ * @param parts Partition to be processed.
*/
public SnapshotResponseRemoteFutureTask(
GridCacheSharedContext<?, ?> cctx,
@@ -72,71 +73,67 @@
return false;
try {
- List<GroupPartitionId> handled = new ArrayList<>();
+ List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
- for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
- ofNullable(e.getValue()).orElse(Collections.emptySet())
- .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
+ Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+ for (SnapshotMetadata meta : metas) {
+ Map<Integer, Set<Integer>> parts0 = meta.partitions();
+
+ if (F.isEmpty(parts0))
+ continue;
+
+ Set<Integer> locParts = parts0.get(pair.getGroupId());
+
+ if (locParts != null && locParts.contains(pair.getPartitionId()))
+ return meta;
+ }
+
+ return null;
+ };
+
+ Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
+
+ parts.forEach((grpId, parts) -> parts.forEach(
+ part -> partsToSend.computeIfAbsent(new GroupPartitionId(grpId, part), findMeta)));
+
+ if (partsToSend.containsValue(null)) {
+ Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+ e -> e.getValue() == null);
+
+ throw new IgniteException("Snapshot partitions missed on local node " +
+ "[snpName=" + snpName + ", missed=" + missed + ']');
}
- snpSndr.init(handled.size());
+ snpSndr.init(partsToSend.size());
File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
- List<CompletableFuture<Void>> futs = new ArrayList<>();
- List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+ CompletableFuture.runAsync(() -> partsToSend.forEach((gp, meta) -> {
+ if (err.get() != null)
+ return;
- for (SnapshotMetadata meta : metas) {
- Map<Integer, Set<Integer>> parts0 = meta.partitions();
+ File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
+ gp.getGroupId());
- if (F.isEmpty(parts0))
- continue;
+ if (cacheDir == null) {
+ throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
+ ", pair=" + gp + ']');
+ }
- handled.removeIf(gp -> {
- if (ofNullable(parts0.get(gp.getGroupId()))
- .orElse(Collections.emptySet())
- .contains(gp.getPartitionId())
- ) {
- futs.add(CompletableFuture.runAsync(() -> {
- if (err.get() != null)
- return;
+ File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
- File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
- gp.getGroupId());
+ if (!snpPart.exists()) {
+ throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
+ ", pair=" + gp + ']');
+ }
- if (cacheDir == null) {
- throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
- ", pair=" + gp + ']');
- }
-
- File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
-
- if (!snpPart.exists()) {
- throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
- ", pair=" + gp + ']');
- }
-
- snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
- }, snpSndr.executor())
- .whenComplete((r, t) -> err.compareAndSet(null, t)));
-
- return true;
- }
-
- return false;
- });
- }
-
- if (!handled.isEmpty()) {
- err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node [snpName=" + snpName +
- ", missed=" + handled + ']'));
- }
-
- int size = futs.size();
-
- CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+ snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
+ }), snpSndr.executor())
.whenComplete((r, t) -> {
- Throwable th = ofNullable(err.get()).orElse(t);
+ if (t != null)
+ err.compareAndSet(null, t);
+
+ Throwable th = err.get();
if (th == null && log.isInfoEnabled()) {
log.info("Snapshot partitions have been sent to the remote node [snpName=" + snpName +
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index d61630b..59525a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -18,12 +18,16 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,6 +36,7 @@
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -49,6 +54,7 @@
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_THREAD_POOL_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.groupIdFromTmpDir;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -56,6 +62,18 @@
/** */
public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestoreBaseTest {
+ /** */
+ private int snapshotThreadPoolSize = DFLT_SNAPSHOT_THREAD_POOL_SIZE;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setSnapshotThreadPoolSize(snapshotThreadPoolSize);
+
+ return cfg;
+ }
+
/** @throws Exception If fails. */
@Test
public void testSnapshotRemoteRequestFromSingleNode() throws Exception {
@@ -300,6 +318,53 @@
"Future cancelled prior to the all requested partitions processed");
}
+ /** @throws Exception If fails. */
+ @Test
+ public void testSendPartitonsSequentially() throws Exception {
+ snapshotThreadPoolSize = 1;
+
+ IgniteEx sndr = startGridsWithCache(3, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+ UUID sndNode = sndr.localNode().id();
+
+ sndr.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ IgniteSnapshotManager mgr0 = snp(grid(0));
+
+ List<UUID> nodes = new CopyOnWriteArrayList<>();
+
+ mgr0.remoteSnapshotSenderFactory((rqId, nodeId) -> new DelegateSnapshotSender(log,
+ snp(sndr).snapshotExecutorService(), mgr0.remoteSnapshotSenderFactory(rqId, nodeId)) {
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ nodes.add(nodeId);
+
+ // Single thread must send partitions sequentially node by node.
+ checkDuplicates(nodes);
+
+ super.sendPart0(part, cacheDirName, pair, length);
+ }
+ });
+
+ Collection<IgniteEx> rcvrs = F.viewReadOnly(G.allGrids(), srv -> (IgniteEx)srv,
+ srv -> !F.eq(sndr.localNode(), srv.cluster().localNode()));
+
+ GridCompoundFuture<Void, Void> futs = new GridCompoundFuture<>();
+
+ for (IgniteEx rcv : rcvrs) {
+ Map<Integer, Set<Integer>> expParts = owningParts(rcv, CU.cacheId(DEFAULT_CACHE_NAME), sndNode);
+
+ IgniteInternalFuture<Void> fut = snp(rcv)
+ .requestRemoteSnapshotFiles(sndNode, SNAPSHOT_NAME, null, expParts, () -> false,
+ defaultPartitionConsumer(expParts, null));
+
+ fut.listen(f -> expParts.values().forEach(integers -> assertTrue(integers.isEmpty())));
+
+ futs.add(fut);
+ }
+
+ futs.markInitialized().get(getTestTimeout());
+ }
+
/**
* @param parts Expected partitions.
* @param latch Latch to await partitions processed.
@@ -315,7 +380,8 @@
assertTrue("Received partition has not been requested",
parts.get(grpId).remove(partId(part.getName())));
- latch.countDown();
+ if (latch != null)
+ latch.countDown();
};
}
@@ -337,4 +403,25 @@
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));
}
+
+ /**
+ * Checks that the list can contain only neighboring duplicates:
+ * 1, 1, 1, 3, 2, 2 -> ok
+ * 1, 1, 3, 1, 2, 2 -> fail
+ *
+ * @param list List to check.
+ */
+ private <T> void checkDuplicates(List<T> list) {
+ LinkedList<T> grouped = new LinkedList<>();
+
+ for (T item : list) {
+ if (!F.eq(grouped.peekLast(), item))
+ grouped.add(item);
+ }
+
+ if (list.stream().distinct().collect(Collectors.toList()).equals(grouped))
+ return;
+
+ fail("List contains non neighboring duplicates: " + list);
+ }
}