IGNITE-14572 Include metastorage to snapshot (#9047)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
index f97ada7..04cdcdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
@@ -51,12 +51,6 @@
     /** Special partition reserved for index space. */
     public static final int INDEX_PARTITION = 0xFFFF;
 
-    /** Old special partition reserved for metastore space. */
-    public static final int OLD_METASTORE_PARTITION = 0x0;
-
-    /** Special partition reserved for metastore space. */
-    public static final int METASTORE_PARTITION = 0x1;
-
     /** Cache group meta page id. */
     public static final long META_PAGE_ID = pageId(INDEX_PARTITION, FLAG_IDX, 0);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 24954a4..05730f3 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -144,9 +144,6 @@
     /** */
     public static final String DFLT_STORE_DIR = "db";
 
-    /** */
-    public static final String META_STORAGE_NAME = "metastorage";
-
     /** Matcher for searching of *.tmp files. */
     public static final PathMatcher TMP_FILE_MATCHER =
         FileSystems.getDefault().getPathMatcher("glob:**" + TMP_SUFFIX);
@@ -298,7 +295,7 @@
                 storeWorkDir.toPath(), entry -> {
                     String name = entry.toFile().getName();
 
-                    return !name.equals(META_STORAGE_NAME) &&
+                    return !name.equals(MetaStorage.METASTORAGE_DIR_NAME) &&
                         (name.startsWith(CACHE_DIR_PREFIX) || name.startsWith(CACHE_GRP_DIR_PREFIX));
                 }
             )) {
@@ -498,9 +495,9 @@
             DataRegion dataRegion = cctx.database().dataRegion(GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME);
 
             CacheStoreHolder holder = initDir(
-                new File(storeWorkDir, META_STORAGE_NAME),
+                new File(storeWorkDir, MetaStorage.METASTORAGE_DIR_NAME),
                 grpId,
-                PageIdAllocator.METASTORE_PARTITION + 1,
+                MetaStorage.METASTORAGE_PARTITIONS.size(),
                 dataRegion.memoryMetrics().totalAllocatedPages()::add,
                 false);
 
@@ -1009,6 +1006,7 @@
 
     /**
      * @param dir Directory to check.
+     * @param names Cache group names to filter.
      * @return Files that match cache or cache group pattern.
      */
     public static List<File> cacheDirectories(File dir, Predicate<String> names) {
@@ -1020,7 +1018,8 @@
         return Arrays.stream(dir.listFiles())
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+                f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
             .filter(f -> names.test(cacheGroupName(f)))
             .collect(Collectors.toList());
     }
@@ -1066,6 +1065,8 @@
             return name.substring(CACHE_GRP_DIR_PREFIX.length());
         else if (name.startsWith(CACHE_DIR_PREFIX))
             return name.substring(CACHE_DIR_PREFIX.length());
+        else if (name.equals(MetaStorage.METASTORAGE_DIR_NAME))
+            return MetaStorage.METASTORAGE_CACHE_NAME;
         else
             throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + dir);
     }
@@ -1185,6 +1186,23 @@
     }
 
     /**
+     * @param grpId Group id.
+     * @return Name of cache group directory.
+     * @throws IgniteCheckedException If cache group doesn't exist.
+     */
+    public String cacheDirName(int grpId) throws IgniteCheckedException {
+        if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
+            return MetaStorage.METASTORAGE_DIR_NAME;
+
+        CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+        if (gctx == null)
+            throw new IgniteCheckedException("Cache group context has not found due to the cache group is stopped.");
+
+        return cacheDirName(gctx.config());
+    }
+
+    /**
      * @param cleanFiles {@code True} if the stores should delete it's files upon close.
      */
     private IgniteCheckedException shutdown(Collection<CacheStoreHolder> holders, boolean cleanFiles) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 2c20a02..3a4eae0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -26,9 +26,12 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.Executor;
@@ -73,7 +76,6 @@
 import org.jetbrains.annotations.NotNull;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_AUX;
-import static org.apache.ignite.internal.pagemem.PageIdAllocator.OLD_METASTORE_PARTITION;
 
 /**
  * General purpose key-value local-only storage.
@@ -85,6 +87,19 @@
     /** */
     public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
 
+    /** Metastorage cache directory to store data. */
+    public static final String METASTORAGE_DIR_NAME = "metastorage";
+
+    /** Old special partition reserved for metastore space. */
+    public static final int OLD_METASTORE_PARTITION = 0x0;
+
+    /** Special partition reserved for metastore space. */
+    public static final int METASTORE_PARTITION = 0x1;
+
+    /** The set of all metastorage partitions. */
+    public static final Set<Integer> METASTORAGE_PARTITIONS =
+        Collections.unmodifiableSet(new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION)));
+
     /** This flag is used ONLY FOR TESTING the migration of a metastorage from Part 0 to Part 1. */
     public static boolean PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
 
@@ -245,9 +260,9 @@
      */
     private void initInternal(IgniteCacheDatabaseSharedManager db) throws IgniteCheckedException {
         if (PRESERVE_LEGACY_METASTORAGE_PARTITION_ID)
-            getOrAllocateMetas(partId = PageIdAllocator.OLD_METASTORE_PARTITION);
-        else if (!readOnly || getOrAllocateMetas(partId = PageIdAllocator.OLD_METASTORE_PARTITION))
-            getOrAllocateMetas(partId = PageIdAllocator.METASTORE_PARTITION);
+            getOrAllocateMetas(partId = OLD_METASTORE_PARTITION);
+        else if (!readOnly || getOrAllocateMetas(partId = OLD_METASTORE_PARTITION))
+            getOrAllocateMetas(partId = METASTORE_PARTITION);
 
         if (!empty) {
             CacheDiagnosticManager diagnosticMgr = cctx.diagnostic();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java
index 6de449e8..6da0335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
@@ -27,7 +26,7 @@
     /** */
     public MetastorageRowStoreEntry(byte[] val) {
         super(0L, MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID ?
-            PageIdAllocator.OLD_METASTORE_PARTITION : PageIdAllocator.METASTORE_PARTITION, val);
+            MetaStorage.OLD_METASTORE_PARTITION : MetaStorage.METASTORE_PARTITION, val);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 90304bb..01a39fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -55,6 +55,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -106,6 +107,7 @@
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridBusyLock;
@@ -166,6 +168,8 @@
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
 import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getPageIO;
@@ -477,7 +481,8 @@
         if (!snpDir.exists())
             return;
 
-        assert snpDir.isDirectory() : snpDir;
+        if (!snpDir.isDirectory())
+            return;
 
         try {
             File binDir = binaryWorkDir(snpDir.getAbsolutePath(), folderName);
@@ -579,6 +584,7 @@
 
         Set<Integer> leftGrps = new HashSet<>(grpIds);
         leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
+        boolean withMetaStorage = leftGrps.remove(METASTORAGE_CACHE_ID);
 
         if (!leftGrps.isEmpty()) {
             return new GridFinishedFuture<>(new IgniteCheckedException("Some of requested cache groups doesn't exist " +
@@ -598,14 +604,20 @@
 
         IgniteInternalFuture<Set<GroupPartitionId>> task0;
 
-        if (parts.isEmpty())
+        if (parts.isEmpty() && !withMetaStorage)
             task0 = new GridFinishedFuture<>(Collections.emptySet());
         else {
             task0 = registerSnapshotTask(req.snapshotName(),
                 req.operationalNodeId(),
                 parts,
+                withMetaStorage,
                 locSndrFactory.apply(req.snapshotName()));
 
+            if (withMetaStorage) {
+                ((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
+                    .suspend(((SnapshotFutureTask)task0).started());
+            }
+
             clusterSnpReq = req;
         }
 
@@ -945,12 +957,22 @@
      * @return The list of cache or cache group names in given snapshot on local node.
      */
     public List<File> snapshotCacheDirectories(String snpName, String folderName) {
+        return snapshotCacheDirectories(snpName, folderName, name -> true);
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param folderName Directory name for cache group.
+     * @param names Cache group names to filter.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String folderName, Predicate<String> names) {
         File snpDir = snapshotLocalDir(snpName);
 
         if (!snpDir.exists())
             return Collections.emptyList();
 
-        return cacheDirectories(new File(snpDir, databaseRelativePath(folderName)), name -> true);
+        return cacheDirectories(new File(snpDir, databaseRelativePath(folderName)), names);
     }
 
     /**
@@ -1097,6 +1119,8 @@
                 .map(CacheGroupDescriptor::cacheOrGroupName)
                 .collect(Collectors.toList());
 
+            grps.add(METASTORAGE_CACHE_NAME);
+
             List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
 
             snpFut0.listen(f -> {
@@ -1112,8 +1136,7 @@
                 grps,
                 new HashSet<>(F.viewReadOnly(srvNodes,
                     F.node2id(),
-                    (node) -> CU.baselineNode(node, clusterState)))
-            ));
+                    (node) -> CU.baselineNode(node, clusterState)))));
 
             String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']';
 
@@ -1206,7 +1229,7 @@
 
             // Schedule task on a checkpoint and wait when it starts.
             try {
-                task.awaitStarted();
+                task.started().get();
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Fail to wait while cluster-wide snapshot operation started", e);
@@ -1297,6 +1320,7 @@
      * @param snpName Unique snapshot name.
      * @param srcNodeId Node id which cause snapshot operation.
      * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param withMetaStorage {@code true} if all metastorage data must be also included into snapshot.
      * @param snpSndr Factory which produces snapshot receiver instance.
      * @return Snapshot operation task which should be registered on checkpoint to run.
      */
@@ -1304,6 +1328,7 @@
         String snpName,
         UUID srcNodeId,
         Map<Integer, Set<Integer>> parts,
+        boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
         if (!busyLock.enterBusy())
@@ -1323,6 +1348,7 @@
                     ioFactory,
                     snpSndr,
                     parts,
+                    withMetaStorage,
                     locBuff));
 
             if (prev != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 978d146..96915bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -51,6 +51,7 @@
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.pagemem.store.PageWriteListener;
@@ -63,10 +64,12 @@
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -77,7 +80,6 @@
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy;
@@ -147,6 +149,9 @@
      */
     private final Map<Integer, Set<Integer>> parts;
 
+    /** {@code true} if all metastorage data must be also included into snapshot. */
+    private final boolean withMetaStorage;
+
     /** Cache group and corresponding partitions collected under the checkpoint write lock. */
     private final Map<Integer, Set<Integer>> processed = new HashMap<>();
 
@@ -186,6 +191,7 @@
         startedFut.onDone(e);
         onDone(e);
         parts = null;
+        withMetaStorage = false;
         ioFactory = null;
         locBuff = null;
     }
@@ -204,14 +210,17 @@
         FileIOFactory ioFactory,
         SnapshotSender snpSndr,
         Map<Integer, Set<Integer>> parts,
+        boolean withMetaStorage,
         ThreadLocal<ByteBuffer> locBuff
     ) {
         assert snpName != null : "Snapshot name cannot be empty or null.";
         assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
         assert snpSndr.executor() != null : "Executor service must be not null.";
         assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
+        assert !parts.containsKey(MetaStorage.METASTORAGE_CACHE_ID) : "The withMetaStorage must be used instead.";
 
         this.parts = parts;
+        this.withMetaStorage = withMetaStorage;
         this.cctx = cctx;
         this.pageStore = (FilePageStoreManager)cctx.pageStore();
         this.log = cctx.logger(SnapshotFutureTask.class);
@@ -296,10 +305,10 @@
     }
 
     /**
-     * @throws IgniteCheckedException If fails.
+     * @return Started future.
      */
-    public void awaitStarted() throws IgniteCheckedException {
-        startedFut.get();
+    public IgniteInternalFuture<?> started() {
+        return startedFut;
     }
 
     /**
@@ -339,11 +348,17 @@
                     throw new IgniteCheckedException("Encrypted cache groups are not allowed to be snapshot: " + grpId);
 
                 // Create cache group snapshot directory on start in a single thread.
-                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())),
+                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, FilePageStoreManager.cacheDirName(gctx.config())),
                     "directory for snapshotting cache group",
                     log);
             }
 
+            if (withMetaStorage) {
+                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, MetaStorage.METASTORAGE_DIR_NAME),
+                    "directory for snapshotting metastorage",
+                    log);
+            }
+
             startedFut.listen(f ->
                 ((GridCacheDatabaseSharedManager)cctx.database()).removeCheckpointListener(this)
             );
@@ -366,7 +381,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeCheckpointBegin(Context ctx) {
+    @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
         if (stopping())
             return;
 
@@ -376,6 +391,15 @@
             else
                 cpEndFut.completeExceptionally(f.error());
         });
+
+        if (withMetaStorage) {
+            try {
+                U.get(((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage()).flush());
+            }
+            catch (IgniteCheckedException ignore) {
+                // Flushing may be cancelled or interrupted due to the local node stopping.
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -453,28 +477,23 @@
 
                 CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
-                if (gctx == null) {
-                    throw new IgniteCheckedException("Cache group context has not found " +
-                        "due to the cache group is stopped: " + grpId);
-                }
-
-                for (int partId : e.getValue()) {
-                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                    PageStore store = pageStore.getStore(grpId, partId);
-
-                    partDeltaWriters.put(pair,
-                        new PageStoreSerialWriter(store,
-                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
-
-                    partFileLengths.put(pair, store.size());
-                }
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group is stopped : " + grpId);
 
                 ccfgs.add(gctx.config());
+                addPartitionWriters(grpId, e.getValue(), FilePageStoreManager.cacheDirName(gctx.config()));
+            }
+
+            if (withMetaStorage) {
+                processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.METASTORAGE_PARTITIONS);
+
+                addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.METASTORAGE_PARTITIONS,
+                    MetaStorage.METASTORAGE_DIR_NAME);
             }
 
             pageStore.readConfigurationFiles(ccfgs,
-                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), cacheDirName(ccfg), ccfgFile)));
+                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(),
+                    FilePageStoreManager.cacheDirName(ccfg), ccfgFile)));
         }
         catch (IgniteCheckedException e) {
             acceptException(e);
@@ -525,76 +544,88 @@
         for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
             futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor()));
 
-        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
-            int grpId = e.getKey();
+        try {
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grpId = e.getKey();
+                String cacheDirName = pageStore.cacheDirName(grpId);
 
-            CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+                // Process partitions for a particular cache group.
+                for (int partId : e.getValue()) {
+                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
 
-            if (gctx == null) {
-                acceptException(new IgniteCheckedException("Cache group context has not found " +
-                    "due to the cache group is stopped: " + grpId));
+                    Long partLen = partFileLengths.get(pair);
 
-                break;
-            }
-
-            // Process partitions for a particular cache group.
-            for (int partId : e.getValue()) {
-                GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                CacheConfiguration<?, ?> ccfg = gctx.config();
-
-                assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair;
-
-                String cacheDirName = cacheDirName(ccfg);
-                Long partLen = partFileLengths.get(pair);
-
-                CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
-                    wrapExceptionIfStarted(() -> {
-                        snpSndr.sendPart(
-                            getPartitionFile(pageStore.workDir(), cacheDirName, partId),
-                            cacheDirName,
-                            pair,
-                            partLen);
-
-                        // Stop partition writer.
-                        partDeltaWriters.get(pair).markPartitionProcessed();
-                    }),
-                    snpSndr.executor())
-                    // Wait for the completion of both futures - checkpoint end, copy partition.
-                    .runAfterBothAsync(cpEndFut,
+                    CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
                         wrapExceptionIfStarted(() -> {
-                            File delta = partDeltaWriters.get(pair).deltaFile;
+                            snpSndr.sendPart(
+                                getPartitionFile(pageStore.workDir(), cacheDirName, partId),
+                                cacheDirName,
+                                pair,
+                                partLen);
 
-                            try {
-                                // Atomically creates a new, empty delta file if and only if
-                                // a file with this name does not yet exist.
-                                delta.createNewFile();
-                            }
-                            catch (IOException ex) {
-                                throw new IgniteCheckedException(ex);
-                            }
-
-                            snpSndr.sendDelta(delta, cacheDirName, pair);
-
-                            boolean deleted = delta.delete();
-
-                            assert deleted;
+                            // Stop partition writer.
+                            partDeltaWriters.get(pair).markPartitionProcessed();
                         }),
-                        snpSndr.executor());
+                        snpSndr.executor())
+                        // Wait for the completion of both futures - checkpoint end, copy partition.
+                        .runAfterBothAsync(cpEndFut,
+                            wrapExceptionIfStarted(() -> {
+                                File delta = partDeltaWriters.get(pair).deltaFile;
 
-                futs.add(fut0);
+                                try {
+                                    // Atomically creates a new, empty delta file if and only if
+                                    // a file with this name does not yet exist.
+                                    delta.createNewFile();
+                                }
+                                catch (IOException ex) {
+                                    throw new IgniteCheckedException(ex);
+                                }
+
+                                snpSndr.sendDelta(delta, cacheDirName, pair);
+
+                                boolean deleted = delta.delete();
+
+                                assert deleted;
+                            }),
+                            snpSndr.executor());
+
+                    futs.add(fut0);
+                }
             }
+
+            int futsSize = futs.size();
+
+            CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
+                .whenComplete((res, t) -> {
+                    assert t == null : "Exception must never be thrown since a wrapper is used " +
+                        "for each snapshot task: " + t;
+
+                    closeAsync();
+                });
         }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+        }
+    }
 
-        int futsSize = futs.size();
+    /**
+     * @param grpId Cache group id.
+     * @param parts Set of partitions to be processed.
+     * @param dirName Directory name to init.
+     * @throws IgniteCheckedException If fails.
+     */
+    private void addPartitionWriters(int grpId, Set<Integer> parts, String dirName) throws IgniteCheckedException {
+        for (int partId : parts) {
+            GroupPartitionId pair = new GroupPartitionId(grpId, partId);
 
-        CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
-            .whenComplete((res, t) -> {
-                assert t == null : "Exception must never be thrown since a wrapper is used " +
-                    "for each snapshot task: " + t;
+            PageStore store = pageStore.getStore(grpId, partId);
 
-                closeAsync();
-            });
+            partDeltaWriters.put(pair,
+                new PageStoreSerialWriter(store,
+                    partDeltaFile(cacheWorkDir(tmpConsIdDir, dirName), partId)));
+
+            partFileLengths.put(pair, store.size());
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 42ef810..1c993a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -41,6 +41,7 @@
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
@@ -57,6 +58,7 @@
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
@@ -242,6 +244,12 @@
                                     return null;
                                 }
 
+                                if (grpId == MetaStorage.METASTORAGE_CACHE_ID) {
+                                    checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA);
+
+                                    return null;
+                                }
+
                                 ByteBuffer pageBuff = buff.get();
                                 pageBuff.clear();
                                 pageStore.read(0, pageBuff, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 038561b..03a1cdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -70,6 +70,7 @@
 import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
 import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
@@ -625,7 +626,10 @@
         FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
 
         // Collect the cache configurations and prepare a temporary directory for copying files.
-        for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName())) {
+        // Metastorage can be restored only manually by directly copying files.
+        for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
+            name -> !METASTORAGE_CACHE_NAME.equals(name)))
+        {
             String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
 
             if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 90f320c..81f6048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
@@ -74,7 +75,6 @@
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.spi.systemview.view.MetastorageView;
-import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -176,7 +176,7 @@
     );
 
     /**
-     * Map with futures used to wait for async write/remove operations completion.
+     * Map with user futures used to wait for async write/remove operations completion.
      */
     private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();
 
@@ -441,11 +441,16 @@
     private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
         assert isPersistenceEnabled;
 
-        worker.setMetaStorage(metastorage);
+        lock.writeLock().lock();
 
-        IgniteThread workerThread = new IgniteThread(ctx.igniteInstanceName(), "dms-writer-thread", worker);
+        try {
+            worker.setMetaStorage(metastorage);
 
-        workerThread.start();
+            worker.start();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
     }
 
     /** {@inheritDoc} */
@@ -1166,6 +1171,8 @@
             return;
         }
 
+        lock.writeLock().lock();
+
         try {
             if (msg instanceof DistributedMetaStorageCasMessage)
                 completeCas((DistributedMetaStorageCasMessage)msg);
@@ -1178,6 +1185,9 @@
         catch (IgniteCheckedException | Error e) {
             throw criticalError(e);
         }
+        finally {
+            lock.writeLock().unlock();
+        }
     }
 
     /**
@@ -1211,6 +1221,32 @@
     }
 
     /**
+     * @return Future which will be completed when all the updates prior to the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        return worker.flush();
+    }
+
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void suspend(IgniteInternalFuture<?> compFut) {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();
+
+        try {
+            // Read lock taken, so no other distributed updated will be added to the queue.
+            worker.suspend(compFut);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
      * Invoke failure handler and rethrow passed exception, possibly wrapped into the unchecked one.
      */
     private RuntimeException criticalError(Throwable e) {
@@ -1231,39 +1267,40 @@
     private void completeWrite(
         DistributedMetaStorageHistoryItem histItem
     ) throws IgniteCheckedException {
-        lock.writeLock().lock();
+        assert lock.writeLock().isHeldByCurrentThread();
 
-        try {
-            histItem = optimizeHistoryItem(histItem);
+        histItem = optimizeHistoryItem(histItem);
 
-            if (histItem == null)
-                return;
+        if (histItem == null)
+            return;
 
-            ver = ver.nextVersion(histItem);
+        ver = ver.nextVersion(histItem);
 
-            for (int i = 0, len = histItem.keys().length; i < len; i++) {
-                String key = histItem.keys()[i];
-                byte[] valBytes = histItem.valuesBytesArray()[i];
+        for (int i = 0, len = histItem.keys().length; i < len; i++) {
+            String key = histItem.keys()[i];
+            byte[] valBytes = histItem.valuesBytesArray()[i];
 
-                notifyListeners(
-                    histItem.keys()[i],
-                    () -> bridge.read(key),
-                    () -> unmarshal(marshaller, valBytes));
-            }
-
-            for (int i = 0, len = histItem.keys().length; i < len; i++)
-                bridge.write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
-
-            addToHistoryCache(ver.id(), histItem);
+            notifyListeners(
+                histItem.keys()[i],
+                () -> bridge.read(key),
+                () -> unmarshal(marshaller, valBytes));
         }
-        finally {
-            lock.writeLock().unlock();
-        }
+
+        for (int i = 0, len = histItem.keys().length; i < len; i++)
+            bridge.write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+
+        addToHistoryCache(ver.id(), histItem);
 
         if (isPersistenceEnabled)
             worker.update(histItem);
 
-        shrinkHistory();
+        // Shrink history so that its estimating size doesn't exceed {@link #histMaxBytes}.
+        while (histCache.sizeInBytes() > histMaxBytes && histCache.size() > 1) {
+            histCache.removeOldest();
+
+            if (isPersistenceEnabled)
+                worker.removeHistItem(ver.id() - histCache.size());
+        }
     }
 
     /**
@@ -1365,25 +1402,6 @@
     }
 
     /**
-     * Shrikn history so that its estimating size doesn't exceed {@link #histMaxBytes}.
-     */
-    private void shrinkHistory() {
-        lock.writeLock().lock();
-
-        try {
-            while (histCache.sizeInBytes() > histMaxBytes && histCache.size() > 1) {
-                histCache.removeOldest();
-
-                if (isPersistenceEnabled)
-                    worker.removeHistItem(ver.id() - histCache.size());
-            }
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /**
      * Notify listeners on node start. Even if there was no data restoring.
      *
      * @param newData Data about which listeners should be notified.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index 2065784e..ebc5c73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -20,32 +20,42 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
 import java.util.function.Consumer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
 
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.COMMON_KEY_PREFIX;
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKey;
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.versionKey;
-import static org.apache.ignite.internal.processors.metastorage.persistence.DmsWorkerStatus.CANCEL;
-import static org.apache.ignite.internal.processors.metastorage.persistence.DmsWorkerStatus.CONTINUE;
-import static org.apache.ignite.internal.processors.metastorage.persistence.DmsWorkerStatus.HALT;
 
 /** */
-class DmsDataWriterWorker extends GridWorker {
+public class DmsDataWriterWorker extends GridWorker {
     /** */
     public static final byte[] DUMMY_VALUE = {};
 
     /** */
-    private final LinkedBlockingQueue<Object> updateQueue = new LinkedBlockingQueue<>();
+    private static final Object STOP = new Object();
+
+    /** */
+    private static final Object AWAIT = new Object();
+
+    /** */
+    private final LinkedBlockingQueue<RunnableFuture<?>> updateQueue = new LinkedBlockingQueue<>();
 
     /** */
     private final DmsLocalMetaStorageLock lock;
@@ -54,22 +64,19 @@
     private final Consumer<Throwable> errorHnd;
 
     /** */
-    @TestOnly
-    public DmsWorkerStatus status() {
-        return status;
-    }
-
-    /** */
-    private volatile DmsWorkerStatus status = CONTINUE;
-
-    /** */
     private DistributedMetaStorageVersion workerDmsVer;
 
     /** */
     private volatile ReadWriteMetastorage metastorage;
 
     /** */
-    private volatile boolean firstStart = true;
+    private volatile CountDownLatch latch = new CountDownLatch(0);
+
+    /**
+     * This task is used to pause processing of the {@code updateQueue}. If this task completed it means that all the updates
+     * prior to it already flushed to the local metastorage.
+     */
+    private volatile Future<?> suspendFut = CompletableFuture.completedFuture(AWAIT);
 
     /** */
     public DmsDataWriterWorker(
@@ -81,6 +88,9 @@
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
@@ -88,9 +98,47 @@
         this.metastorage = metastorage;
     }
 
+    /** Start new distributed metastorage worker thread. */
+    public void start() {
+        isCancelled = false;
+
+        new IgniteThread(igniteInstanceName(), "dms-writer-thread", this).start();
+    }
+
+    /**
+     * @return Future which will be completed when all tasks prior to the pause task are finished.
+     */
+    public Future<?> flush() {
+        return suspendFut;
+    }
+
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void suspend(IgniteInternalFuture<?> compFut) {
+        if (isCancelled())
+            suspendFut = CompletableFuture.completedFuture(AWAIT);
+        else {
+            latch = new CountDownLatch(1);
+
+            updateQueue.offer((RunnableFuture<?>)(suspendFut = new FutureTask<>(() -> AWAIT)));
+
+            compFut.listen(f -> latch.countDown());
+        }
+    }
+
     /** */
     public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
+
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
+
+            metastorage.write(versionKey(), workerDmsVer);
+
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
     /** */
@@ -98,22 +146,48 @@
         assert fullNodeData.fullData != null;
         assert fullNodeData.hist != null;
 
-        updateQueue.clear();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-        updateQueue.offer(fullNodeData);
+            doCleanup();
+
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
+
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
+
+            metastorage.write(versionKey(), fullNodeData.ver);
+
+            workerDmsVer = fullNodeData.ver;
+
+            metastorage.remove(cleanupGuardKey());
+        }));
     }
 
     /** */
     public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
     }
 
     /** */
     public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
+        if (halt) {
             updateQueue.clear();
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            if (suspendFut instanceof RunnableFuture)
+                ((Runnable)suspendFut).run();
+        }
+
+        updateQueue.offer(new FutureTask<>(() -> STOP));
+        latch.countDown();
+
+        isCancelled = true;
 
         Thread runner = runner();
 
@@ -122,93 +196,30 @@
     }
 
     /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    @Override protected void body() {
+        while (true) {
+            try {
+                RunnableFuture<?> curTask = updateQueue.take();
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+                curTask.run();
 
-                lock.lock();
+                // Result will be null for any runnable executed tasks over metastorage and non-null for system DMS tasks.
+                Object res = U.get(curTask);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
+                if (res == STOP)
+                    break;
+
+                if (res == AWAIT)
+                    latch.await();
             }
+            catch (InterruptedException ignore) {
+            }
+            catch (Throwable t) {
+                errorHnd.accept(t);
 
-            while (true) {
-                Object update = updateQueue.peek();
-
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
-
-                lock.lock();
-
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
-
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
-
-                        doCleanup();
-
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
-
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
-
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
-
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
-
-                        metastorage.write(versionKey(), fullNodeData.ver);
-
-                        workerDmsVer = fullNodeData.ver;
-
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
-
-                        metastorage.remove(historyItemKey(ver));
-                    }
-                    else {
-                        assert update instanceof DmsWorkerStatus : update;
-
-                        break;
-                    }
-                }
-                finally {
-                    lock.unlock();
-                }
+                break;
             }
         }
-        catch (Throwable t) {
-            errorHnd.accept(t);
-        }
-    }
-
-    /** */
-    private void applyUpdate(DistributedMetaStorageHistoryItem histItem) throws IgniteCheckedException {
-        metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
-
-        workerDmsVer = workerDmsVer.nextVersion(histItem);
-
-        metastorage.write(versionKey(), workerDmsVer);
-
-        for (int i = 0, len = histItem.keys().length; i < len; i++)
-            write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
     }
 
     /** */
@@ -285,4 +296,24 @@
         else
             metastorage.writeRaw(localKey(key), valBytes);
     }
+
+    /**
+     * @param task Task to execute on local metastorage.
+     * @return Future will be completed when task has been finished.
+     */
+    private RunnableFuture<Void> newDmsTask(IgniteThrowableRunner task) {
+        return new FutureTask<>(() -> {
+            lock.lock();
+
+            try {
+                task.run();
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+            finally {
+                lock.unlock();
+            }
+        }, null);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsWorkerStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsWorkerStatus.java
deleted file mode 100644
index bb3be51..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsWorkerStatus.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.metastorage.persistence;
-
-/** */
-enum DmsWorkerStatus {
-    /** */
-    CONTINUE,
-
-    /** */
-    CANCEL,
-
-    /** */
-    HALT;
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index 691c35f3..e675f63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -44,7 +44,6 @@
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
@@ -246,7 +245,7 @@
         MetaStorage metaStorage = ignite.context().cache().context().database().metaStorage();
 
         corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID,
-            PageIdAllocator.METASTORE_PARTITION);
+            MetaStorage.METASTORE_PARTITION);
 
         stopGrid(0);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
index e70651a..5f65cf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
@@ -51,10 +51,10 @@
 import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.META_STORAGE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_DIR_NAME;
 
 /**
  *
@@ -336,7 +336,7 @@
         boolean metaStore = METASTORAGE_CACHE_NAME.equals(cacheName);
         boolean txLog = TX_LOG_CACHE_NAME.equals(cacheName);
 
-        File cacheWorkDir = metaStore ? new File(pageStoreMgr.workDir(), META_STORAGE_NAME) :
+        File cacheWorkDir = metaStore ? new File(pageStoreMgr.workDir(), METASTORAGE_DIR_NAME) :
             txLog ? new File(pageStoreMgr.workDir(), TX_LOG_CACHE_NAME) :
             pageStoreMgr.cacheWorkDir(node.cachex(cacheName).configuration());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
index 829461b..fe9f8f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
@@ -53,8 +53,8 @@
 import static java.nio.file.Files.walkFileTree;
 import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage.CP_FILE_NAME_PATTERN;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.META_STORAGE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_DIR_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_TEMP_NAME_PATTERN;
 import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
@@ -224,7 +224,7 @@
 
                     String parentDirName = path.toFile().getParentFile().getName();
 
-                    if (parentDirName.equals(META_STORAGE_NAME) || parentDirName.equals(TxLog.TX_LOG_CACHE_NAME))
+                    if (parentDirName.equals(METASTORAGE_DIR_NAME) || parentDirName.equals(TxLog.TX_LOG_CACHE_NAME))
                         return CONTINUE;
 
                     if (WAL_NAME_PATTERN.matcher(name).matches() || WAL_TEMP_NAME_PATTERN.matcher(name).matches())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index b873f7d..c3827b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -33,6 +33,7 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
@@ -332,9 +333,25 @@
         String snpName,
         boolean activate
     ) throws Exception {
+        return startGridsFromSnapshot(IntStream.range(0, cnt).boxed().collect(Collectors.toSet()), path, snpName, activate);
+    }
+
+    /**
+     * @param ids Set of ignite instances ids to start.
+     * @param path Snapshot path resolver.
+     * @param snpName Snapshot to start grids from.
+     * @param activate {@code true} to activate after cluster start.
+     * @return Coordinator ignite instance.
+     * @throws Exception If fails.
+     */
+    protected IgniteEx startGridsFromSnapshot(Set<Integer> ids,
+        Function<IgniteConfiguration, String> path,
+        String snpName,
+        boolean activate
+    ) throws Exception {
         IgniteEx crd = null;
 
-        for (int i = 0; i < cnt; i++) {
+        for (Integer i : ids) {
             IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
 
             cfg.setWorkDirectory(Paths.get(path.apply(cfg), snpName).toString());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index 1fa2149..ac5566e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -58,6 +58,7 @@
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
@@ -888,13 +889,13 @@
 
         awaitPartitionMapExchange();
 
-        for (Ignite grid : Arrays.asList(grid(1), grid(2))) {
-            ((IgniteEx)grid).context().cache().context().exchange()
+        for (IgniteEx grid : Arrays.asList(grid(1), grid(2))) {
+            grid.context().cache().context().exchange()
                 .registerExchangeAwareComponent(new PartitionsExchangeAware() {
                     /** {@inheritDoc} */
                     @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
                         try {
-                            block.await();
+                            block.await(SNAPSHOT_AWAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                         }
                         catch (InterruptedException e) {
                             fail("Must not catch exception here: " + e.getMessage());
@@ -907,7 +908,7 @@
             TestRecordingCommunicationSpi.spi(grid)
                 .blockMessages((node, msg) -> {
                     if (msg instanceof GridDhtPartitionsSingleMessage)
-                        return ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null;
+                        return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null;
 
                     return false;
                 });
@@ -981,6 +982,9 @@
                 .registerExchangeAwareComponent(new PartitionsExchangeAware() {
                     /** {@inheritDoc} */
                     @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        if (!(fut.firstEvent() instanceof DiscoveryCustomEvent))
+                            return;
+
                         try {
                             exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced()));
                             latch.countDown();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index d0a943d..2260ada 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -196,6 +196,7 @@
         SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
             cctx.localNodeId(),
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            false,
             new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
                 @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
                     try {
@@ -243,7 +244,7 @@
             // right after current will be completed.
             cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, SNAPSHOT_NAME));
 
-            snpFutTask.awaitStarted();
+            snpFutTask.started().get();
 
             db.forceCheckpoint("snapshot is ready to be created")
                 .futureFor(CheckpointState.MARKER_STORED_TO_DISK)
@@ -555,7 +556,7 @@
         Map<Integer, Set<Integer>> parts,
         SnapshotSender snpSndr
     ) throws IgniteCheckedException {
-        SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, snpSndr);
+        SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, false, snpSndr);
 
         snpFutTask.start();
 
@@ -564,7 +565,7 @@
         // right after current will be completed.
         cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
 
-        snpFutTask.awaitStarted();
+        snpFutTask.started().get();
 
         return snpFutTask;
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
new file mode 100644
index 0000000..a376148
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        Function<IgniteConfiguration, String> pathProv = cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath();
+        Set<String> keySet0 = new TreeSet<>();
+        Set<String> keySet1 = new TreeSet<>();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0), pathProv, SNAPSHOT_NAME, false);
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet0.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1), pathProv, SNAPSHOT_NAME, false);
+        snp1.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet1.add(key));
+
+        assertEquals("Keys must be the same on all nodes", keySet0, keySet1);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        Set<String> writtenKeys = new TreeSet<>();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    String key = SNAPSHOT_PREFIX + keyCnt.getAndIncrement();
+
+                    ignite.context().distributedMetastorage().write(key, "value");
+
+                    writtenKeys.add(key);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        ((GridCacheDatabaseSharedManager)ignite.context().cache().context().database())
+            .addCheckpointListener(new CheckpointListener() {
+                @Override public void onMarkCheckpointBegin(Context ctx) {
+                    if (ctx.progress().reason().contains(SNAPSHOT_NAME)) {
+                        Map<String, SnapshotFutureTask> locMap =
+                            GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class, "locSnpTasks");
+
+                        // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
+                        locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
+                    }
+                }
+
+                @Override public void onCheckpointBegin(Context ctx) {
+                }
+
+                @Override public void beforeCheckpointBegin(Context ctx) {
+                }
+            });
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        GridTestUtils.assertThrowsAnyCause(log, fut::get, IgniteCheckedException.class, "Test exception");
+
+        stop.set(true);
+
+        // Load future must complete without exceptions, all metastorage keys must be written.
+        updFut.get();
+
+        Set<String> readedKeys = new TreeSet<>();
+
+        ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> readedKeys.add(key));
+
+        assertEquals("Not all metastorage keys have been written", writtenKeys, readedKeys);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index 400ef14..cabb7f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -483,7 +483,7 @@
     /**
      * @throws Exception If failed.
      */
-    @Test @SuppressWarnings("ThrowableNotThrown")
+    @Test
     public void testConflictingData() throws Exception {
         IgniteEx igniteEx = startGrid(0);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java
index 8a4cccf..4e73d6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.processors.metastorage.persistence;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
 import org.junit.After;
 import org.junit.Assert;
@@ -37,15 +39,9 @@
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.versionKey;
 import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION;
 import static org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker.DUMMY_VALUE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
 /** */
-public class DmsDataWriterWorkerTest {
-    /** */
-    private static IgniteLogger log = new GridTestLog4jLogger(true).getLogger(DmsDataWriterWorkerTest.class);
-
+public class DmsDataWriterWorkerTest extends GridCommonAbstractTest {
     /** */
     private Thread testThread;
 
@@ -59,6 +55,9 @@
     private DmsDataWriterWorker worker;
 
     /** */
+    private final AtomicReference<Throwable> errHnd = new AtomicReference<>();
+
+    /** */
     @Before
     public void before() {
         testThread = Thread.currentThread();
@@ -71,7 +70,7 @@
             DmsDataWriterWorkerTest.class.getSimpleName(),
             log,
             lock,
-            throwable -> {}
+            errHnd::set
         );
 
         worker.setMetaStorage(metastorage);
@@ -103,7 +102,7 @@
 
         startWorker();
 
-        worker.cancel(true);
+        worker.cancel(false);
 
         assertEquals(1, metastorage.cache.size());
         assertEquals(INITIAL_VERSION, metastorage.read(versionKey()));
@@ -290,24 +289,22 @@
     @Test
     public void testHalt() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        AtomicBoolean await = new AtomicBoolean(true);
+
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
 
         metastorage = new MyReadWriteMetaStorageMock() {
             @Override public void writeRaw(String key, byte[] data) {
                 try {
-                    if (await.get())
-                        latch.countDown();
+                    assertTrue(GridTestUtils.waitForCondition(() -> queue.size() == 3, getTestTimeout()));
 
-                    while (worker.status() != DmsWorkerStatus.HALT) {
-                        //noinspection BusyWait
-                        Thread.sleep(0);
-                    }
+                    latch.countDown();
+
+                    assertTrue(GridTestUtils.waitForCondition(() -> queue.size() == 1, getTestTimeout()));
                 }
                 catch (Exception ignore) {
                 }
 
-                await.set(false);
-
                 super.writeRaw(key, data);
             }
         };
@@ -318,11 +315,15 @@
 
         worker.update(histItem("key1", "val1"));
         worker.update(histItem("key2", "val2"));
+        worker.update(histItem("key3", "val3"));
+        worker.update(histItem("key4", "val4"));
 
         latch.await();
 
         worker.cancel(true);
 
+        assertNull(errHnd.get());
+
         // ver, val, hist.
         assertEquals(3, metastorage.cache.size());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index fbbe62e..925f3f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -45,6 +45,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
 import org.apache.ignite.internal.processors.performancestatistics.CacheStartTest;
 import org.apache.ignite.internal.processors.performancestatistics.CheckpointTest;
 import org.apache.ignite.internal.processors.performancestatistics.ForwardReadTest;
@@ -99,6 +100,7 @@
     IgniteSnapshotManagerSelfTest.class,
     IgniteClusterSnapshotSelfTest.class,
     IgniteClusterSnapshotCheckTest.class,
+    IgniteSnapshotWithMetastorageTest.class,
     IgniteSnapshotMXBeanTest.class,
     IgniteClusterSnapshotRestoreSelfTest.class,