IGNITE-13805 Add cache groups snapshot restore on the same topology (#8648)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
index 3623c1f..0d46cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite;
 
+import java.util.Collection;
 import org.apache.ignite.lang.IgniteFuture;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * This interface provides functionality for creating cluster-wide cache data snapshots.
@@ -48,4 +50,16 @@
      * @return Future which will be completed when cancel operation finished.
      */
     public IgniteFuture<Void> cancelSnapshot(String name);
+
+    /**
+     * Restore cache group(s) from the snapshot.
+     * <p>
+     * <b>NOTE:</b> Cache groups to be restored from the snapshot must not present in the cluster, if they present,
+     * they must be destroyed by the user (eg with {@link IgniteCache#destroy()}) before starting this operation.
+     *
+     * @param name Snapshot name.
+     * @param cacheGroupNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
+     * @return Future which will be completed when restore operation finished.
+     */
+    public IgniteFuture<Void> restoreSnapshot(String name, @Nullable Collection<String> cacheGroupNames);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index fa487a0..4f8fdba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -139,7 +139,10 @@
     CACHE_GROUP_KEY_CHANGE(47),
 
     /** Collecting performance statistics. */
-    PERFORMANCE_STATISTICS(48);
+    PERFORMANCE_STATISTICS(48),
+
+    /** Restore cache group from the snapshot. */
+    SNAPSHOT_RESTORE_CACHE_GROUP(49);
 
     /**
      * Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index ffe51c8..236051a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -48,6 +48,7 @@
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
@@ -55,6 +56,7 @@
 import org.apache.ignite.internal.managers.systemview.walker.CacheViewWalker;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -582,6 +584,28 @@
         DiscoveryDataClusterState state = ctx.state().clusterState();
 
         if (state.active() && !state.transition()) {
+            Set<IgniteUuid> restartIds = new HashSet<>(F.viewReadOnly(
+                batch.requests(), DynamicCacheChangeRequest::restartId, req -> req.start() && req.restartId() != null));
+
+            assert restartIds.size() <= 1 : batch.requests();
+
+            Collection<UUID> nodes = ctx.cache().context().snapshotMgr().cacheStartRequiredAliveNodes(F.first(restartIds));
+
+            for (UUID nodeId : nodes) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node != null && CU.baselineNode(node, state) && ctx.discovery().alive(node))
+                    continue;
+
+                ClusterTopologyCheckedException err =
+                    new ClusterTopologyCheckedException("Required node has left the cluster [nodeId=" + nodeId + ']');
+
+                for (DynamicCacheChangeRequest req : batch.requests())
+                    ctx.cache().completeCacheStartFuture(req, false, err);
+
+                return false;
+            }
+
             ExchangeActions exchangeActions = new ExchangeActions();
 
             CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions,
@@ -593,6 +617,9 @@
                 assert !exchangeActions.empty() : exchangeActions;
 
                 batch.exchangeActions(exchangeActions);
+
+                if (!nodes.isEmpty())
+                    exchangeActions.cacheStartRequiredAliveNodes(nodes);
             }
 
             return res.needExchange;
@@ -1007,6 +1034,16 @@
             }
         }
 
+        if (err == null && req.restartId() == null) {
+            IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
+
+            if (snapshotMgr.isRestoring(cacheName, ccfg.getGroupName())) {
+                err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " +
+                    "currently being restored from a snapshot [cache=" + cacheName +
+                    (ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']');
+            }
+        }
+
         if (err != null) {
             if (persistedCfgs)
                 res.errs.add(err);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index cbe7df4..8736a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -43,6 +44,12 @@
     /** */
     private Map<String, CacheActionData> cachesToStart;
 
+    /**
+     * Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when starting
+     * the cache(s), the whole procedure is rolled back.
+     */
+    private Collection<UUID> cacheStartRequiredAliveNodes;
+
     /** */
     private Map<String, CacheActionData> cachesToStop;
 
@@ -320,6 +327,23 @@
     }
 
     /**
+     * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
+     *      starting the cache(s), the whole procedure is rolled back.
+     */
+    public Collection<UUID> cacheStartRequiredAliveNodes() {
+        return cacheStartRequiredAliveNodes == null ? Collections.emptyList() : cacheStartRequiredAliveNodes;
+    }
+
+    /**
+     * @param cacheStartRequiredAliveNodes Server nodes on which a successful start of the cache(s) is required, if any
+     *                                     of these nodes fails when starting the cache(s), the whole procedure is
+     *                                     rolled back.
+     */
+    public void cacheStartRequiredAliveNodes(Collection<UUID> cacheStartRequiredAliveNodes) {
+        this.cacheStartRequiredAliveNodes = new ArrayList<>(cacheStartRequiredAliveNodes);
+    }
+
+    /**
      * @param grpDesc Group descriptor.
      * @param destroy Destroy flag.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e8ee3f1..142c4db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -4267,6 +4267,9 @@
         if (res == null)
             res = validateRestartingCaches(node);
 
+        if (res == null)
+            res = validateRestoringCaches(node);
+
         return res;
     }
 
@@ -4294,6 +4297,20 @@
     }
 
     /**
+     * @param node Joining node to validate.
+     * @return Node validation result if there was an issue with the joining node, {@code null} otherwise.
+     */
+    private IgniteNodeValidationResult validateRestoringCaches(ClusterNode node) {
+        if (ctx.cache().context().snapshotMgr().isRestoring()) {
+            String msg = "Joining node during caches restore is not allowed [joiningNodeId=" + node.id() + ']';
+
+            return new IgniteNodeValidationResult(node.id(), msg);
+        }
+
+        return null;
+    }
+
+    /**
      * @return Keep static cache configuration flag. If {@code true}, static cache configuration will override
      * configuration persisted on disk.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index f3495ac..cb513f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -293,6 +293,8 @@
 
         stateAwareMgrs.add(snpMgr);
 
+        stateAwareMgrs.add(snapshotMgr);
+
         for (PluginProvider prv : kernalCtx.plugins().allProviders())
             if (prv instanceof IgniteChangeGlobalStateSupport)
                 stateAwareMgrs.add(((IgniteChangeGlobalStateSupport)prv));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 4d613d9..15619d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -32,6 +32,7 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.BooleanSupplier;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
@@ -55,6 +56,7 @@
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.UnregisteredBinaryTypeException;
@@ -993,6 +995,42 @@
     }
 
     /** {@inheritDoc} */
+    @Override public void updateMetadata(File metadataDir, BooleanSupplier stopChecker) throws IgniteCheckedException {
+        if (!metadataDir.exists())
+            return;
+
+        try {
+            ConcurrentMap<Integer, BinaryMetadataHolder> metaCache = new ConcurrentHashMap<>();
+
+            new BinaryMetadataFileStore(metaCache, ctx, log, metadataDir)
+                .restoreMetadata();
+
+            Collection<BinaryMetadata> metadata = F.viewReadOnly(metaCache.values(), BinaryMetadataHolder::metadata);
+
+            // Check the compatibility of the binary metadata.
+            for (BinaryMetadata newMeta : metadata) {
+                BinaryMetadata oldMeta = binaryMetadata(newMeta.typeId());
+
+                if (oldMeta != null)
+                    BinaryUtils.mergeMetadata(oldMeta, newMeta, null);
+            }
+
+            // Update cluster metadata.
+            for (BinaryMetadata newMeta : metadata) {
+                if (stopChecker.getAsBoolean())
+                    return;
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+
+                addMeta(newMeta.typeId(), newMeta.wrap(binaryContext()), false);
+            }
+        } catch (BinaryObjectException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException {
         A.notNullOrEmpty(typeName, "enum type name");
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b3642c1..6826d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -299,7 +299,7 @@
     private boolean forceAffReassignment;
 
     /** Exception that was thrown during init phase on local node. */
-    private Exception exchangeLocE;
+    private volatile Exception exchangeLocE;
 
     /** Exchange exceptions from all participating nodes. */
     private final Map<UUID, Exception> exchangeGlobalExceptions = new ConcurrentHashMap<>();
@@ -5124,6 +5124,12 @@
 
                             if (crd0 == null)
                                 finishState = new FinishState(null, initialVersion(), null);
+
+                            if (dynamicCacheStartExchange() && exchangeLocE == null &&
+                                exchActions.cacheStartRequiredAliveNodes().contains(node.id())) {
+                                exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE = new ClusterTopologyCheckedException(
+                                    "Required node has left the cluster [nodeId=" + node.id() + ']'));
+                            }
                         }
 
                         if (crd0 == null) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index a7c682d..24954a4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -974,32 +974,40 @@
         Arrays.sort(files);
 
         for (File file : files) {
-            if (file.isDirectory()) {
-                if (file.getName().startsWith(CACHE_DIR_PREFIX)) {
-                    File conf = new File(file, CACHE_DATA_FILENAME);
-
-                    if (conf.exists() && conf.length() > 0) {
-                        StoredCacheData cacheData = readCacheData(conf);
-
-                        String cacheName = cacheData.config().getName();
-
-                        if (!ccfgs.containsKey(cacheName))
-                            ccfgs.put(cacheName, cacheData);
-                        else {
-                            U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "
-                                    + file.getName());
-                        }
-                    }
-                }
-                else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
-                    readCacheGroupCaches(file, ccfgs);
-            }
+            if (file.isDirectory())
+                readCacheConfigurations(file, ccfgs);
         }
 
         return ccfgs;
     }
 
     /**
+     * @param dir Cache (group) directory.
+     * @param ccfgs Cache configurations.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void readCacheConfigurations(File dir, Map<String, StoredCacheData> ccfgs) throws IgniteCheckedException {
+        if (dir.getName().startsWith(CACHE_DIR_PREFIX)) {
+            File conf = new File(dir, CACHE_DATA_FILENAME);
+
+            if (conf.exists() && conf.length() > 0) {
+                StoredCacheData cacheData = readCacheData(conf);
+
+                String cacheName = cacheData.config().getName();
+
+                if (!ccfgs.containsKey(cacheName))
+                    ccfgs.put(cacheName, cacheData);
+                else {
+                    U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "
+                        + dir.getName());
+                }
+            }
+        }
+        else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            readCacheGroupCaches(dir, ccfgs);
+    }
+
+    /**
      * @param dir Directory to check.
      * @return Files that match cache or cache group pattern.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index d23b584..90304bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -51,7 +51,6 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.BiFunction;
@@ -105,6 +104,7 @@
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
 import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.internal.processors.marshaller.MappedName;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -121,7 +121,6 @@
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -129,6 +128,7 @@
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -188,7 +188,7 @@
  * </ul>
  */
 public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
-    implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+    implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener, IgniteChangeGlobalStateSupport {
     /** File with delta pages suffix. */
     public static final String DELTA_SUFFIX = ".delta";
 
@@ -258,6 +258,9 @@
     /** Marshaller. */
     private final Marshaller marsh;
 
+    /** Distributed process to restore cache group from the snapshot. */
+    private final SnapshotRestoreProcess restoreCacheGrpProc;
+
     /** Resolved persistent data storage settings. */
     private volatile PdsFolderSettings pdsSettings;
 
@@ -315,6 +318,8 @@
             this::processLocalSnapshotEndStageResult);
 
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
+
+        restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
     }
 
     /**
@@ -404,12 +409,14 @@
                     for (SnapshotFutureTask sctx : locSnpTasks.values()) {
                         if (sctx.sourceNodeId().equals(leftNodeId) ||
                             (snpReq != null &&
-                                snpReq.snpName.equals(sctx.snapshotName()) &&
-                                snpReq.bltNodes.contains(leftNodeId))) {
+                                snpReq.snapshotName().equals(sctx.snapshotName()) &&
+                                snpReq.nodes().contains(leftNodeId))) {
                             sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
                                 "One of baseline nodes left the cluster: " + leftNodeId));
                         }
                     }
+
+                    restoreCacheGrpProc.onNodeLeft(leftNodeId);
                 }
             }
             finally {
@@ -423,6 +430,8 @@
         busyLock.block();
 
         try {
+            restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
+
             // Try stop all snapshot processing if not yet.
             for (SnapshotFutureTask sctx : locSnpTasks.values())
                 sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -450,6 +459,16 @@
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) {
+        restoreCacheGrpProc.interrupt(new IgniteCheckedException("The cluster has been deactivated."));
+    }
+
     /**
      * @param snpDir Snapshot dir.
      * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
@@ -547,7 +566,7 @@
                 "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
         }
 
-        Set<UUID> leftNodes = new HashSet<>(req.bltNodes);
+        Set<UUID> leftNodes = new HashSet<>(req.nodes());
         leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
             F.node2id()));
 
@@ -556,7 +575,9 @@
                 "prior to snapshot operation start: " + leftNodes));
         }
 
-        Set<Integer> leftGrps = new HashSet<>(req.grpIds);
+        List<Integer> grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId));
+
+        Set<Integer> leftGrps = new HashSet<>(grpIds);
         leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
 
         if (!leftGrps.isEmpty()) {
@@ -568,7 +589,7 @@
 
         // Prepare collection of pairs group and appropriate cache partition to be snapshot.
         // Cache group context may be 'null' on some nodes e.g. a node filter is set.
-        for (Integer grpId : req.grpIds) {
+        for (Integer grpId : grpIds) {
             if (cctx.cache().cacheGroup(grpId) == null)
                 continue;
 
@@ -580,10 +601,10 @@
         if (parts.isEmpty())
             task0 = new GridFinishedFuture<>(Collections.emptySet());
         else {
-            task0 = registerSnapshotTask(req.snpName,
-                req.srcNodeId,
+            task0 = registerSnapshotTask(req.snapshotName(),
+                req.operationalNodeId(),
                 parts,
-                locSndrFactory.apply(req.snpName));
+                locSndrFactory.apply(req.snapshotName()));
 
             clusterSnpReq = req;
         }
@@ -593,11 +614,11 @@
                 throw F.wrap(fut.error());
 
             try {
-                Set<String> blts = req.bltNodes.stream()
+                Set<String> blts = req.nodes().stream()
                     .map(n -> cctx.discovery().node(n).consistentId().toString())
                     .collect(Collectors.toSet());
 
-                File smf = new File(snapshotLocalDir(req.snpName), snapshotMetaFileName(cctx.localNode().consistentId().toString()));
+                File smf = new File(snapshotLocalDir(req.snapshotName()), snapshotMetaFileName(cctx.localNode().consistentId().toString()));
 
                 if (smf.exists())
                     throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
@@ -606,12 +627,12 @@
 
                 try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
                     U.marshal(marsh,
-                        new SnapshotMetadata(req.rqId,
-                            req.snpName,
+                        new SnapshotMetadata(req.requestId(),
+                            req.snapshotName(),
                             cctx.localNode().consistentId().toString(),
                             pdsSettings.folderName(),
                             cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
-                            req.grpIds,
+                            grpIds,
                             blts,
                             fut.result()),
                         out);
@@ -640,7 +661,7 @@
 
         boolean cancelled = err.values().stream().anyMatch(e -> e instanceof IgniteFutureCancelledCheckedException);
 
-        if (snpReq == null || !snpReq.rqId.equals(id)) {
+        if (snpReq == null || !snpReq.requestId().equals(id)) {
             synchronized (snpOpMux) {
                 if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
                     if (cancelled) {
@@ -659,18 +680,18 @@
         }
 
         if (isLocalNodeCoordinator(cctx.discovery())) {
-            Set<UUID> missed = new HashSet<>(snpReq.bltNodes);
+            Set<UUID> missed = new HashSet<>(snpReq.nodes());
             missed.removeAll(res.keySet());
             missed.removeAll(err.keySet());
 
             if (cancelled) {
-                snpReq.err = new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
-                    "has been cancelled by external process [err=" + err + ", missed=" + missed + ']');
+                snpReq.error(new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
+                    "has been cancelled by external process [err=" + err + ", missed=" + missed + ']'));
             }
             else if (!F.isEmpty(err) || !missed.isEmpty()) {
-                snpReq.err = new IgniteCheckedException("Execution of local snapshot tasks fails or them haven't been executed " +
+                snpReq.error(new IgniteCheckedException("Execution of local snapshot tasks fails or them haven't been executed " +
                     "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
-                    "[err=" + err + ", missed=" + missed + ']');
+                    "[err=" + err + ", missed=" + missed + ']'));
             }
 
             endSnpProc.start(UUID.randomUUID(), snpReq);
@@ -686,8 +707,8 @@
             return new GridFinishedFuture<>(new SnapshotOperationResponse());
 
         try {
-            if (req.err != null)
-                deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName());
+            if (req.error() != null)
+                deleteSnapshot(snapshotLocalDir(req.snapshotName()), pdsSettings.folderName());
 
             removeLastMetaStorageKey();
         }
@@ -709,26 +730,26 @@
         if (snpReq == null)
             return;
 
-        Set<UUID> endFail = new HashSet<>(snpReq.bltNodes);
+        Set<UUID> endFail = new HashSet<>(snpReq.nodes());
         endFail.removeAll(res.keySet());
 
         clusterSnpReq = null;
 
         synchronized (snpOpMux) {
             if (clusterSnpFut != null) {
-                if (endFail.isEmpty() && snpReq.err == null) {
+                if (endFail.isEmpty() && snpReq.error() == null) {
                     clusterSnpFut.onDone();
 
                     if (log.isInfoEnabled())
                         log.info(SNAPSHOT_FINISHED_MSG + snpReq);
                 }
-                else if (snpReq.err == null) {
+                else if (snpReq.error() == null) {
                     clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
                         "Local snapshot tasks may not finished completely or finalizing results fails " +
                         "[fail=" + endFail + ", err=" + err + ']'));
                 }
                 else
-                    clusterSnpFut.onDone(snpReq.err);
+                    clusterSnpFut.onDone(snpReq.error());
 
                 clusterSnpFut = null;
             }
@@ -748,6 +769,38 @@
     }
 
     /**
+     * Check if snapshot restore process is currently running.
+     *
+     * @return {@code True} if the snapshot restore operation is in progress.
+     */
+    public boolean isRestoring() {
+        return restoreCacheGrpProc.isRestoring();
+    }
+
+    /**
+     * @param restoreId Restore process ID.
+     * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
+     *         starting the cache(s), the whole procedure is rolled back.
+     */
+    public Set<UUID> cacheStartRequiredAliveNodes(@Nullable IgniteUuid restoreId) {
+        if (restoreId == null)
+            return Collections.emptySet();
+
+        return restoreCacheGrpProc.cacheStartRequiredAliveNodes(restoreId);
+    }
+
+    /**
+     * Check if the cache or group with the specified name is currently being restored from the snapshot.
+     *
+     * @param cacheName Cache name.
+     * @param grpName Cache group name.
+     * @return {@code True} if the cache or group with the specified name is being restored.
+     */
+    public boolean isRestoring(String cacheName, @Nullable String grpName) {
+        return restoreCacheGrpProc.isRestoring(cacheName, grpName);
+    }
+
+    /**
      * @return List of all known snapshots on the local node.
      */
     public List<String> localSnapshotNames() {
@@ -829,34 +882,20 @@
         A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
         A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
 
-        GridKernalContext kctx0 = cctx.kernalContext();
         GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
 
-        kctx0.security().authorize(ADMIN_SNAPSHOT);
-
-        Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
-            (node) -> CU.baselineNode(node, kctx0.state().clusterState()));
-
-        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
-        kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);
-
-        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
-            .listen(f0 -> {
+        collectSnapshotMetadata(name).listen(f0 -> {
                 if (f0.error() == null) {
                     Map<ClusterNode, List<SnapshotMetadata>> metas = f0.result();
 
-                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
-                    kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet()));
-
-                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas)
-                        .listen(f1 -> {
-                            if (f1.error() == null)
-                                res.onDone(f1.result());
-                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
-                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
-                            else
-                                res.onDone(f1.error());
-                        });
+                    runSnapshotVerification(metas).listen(f1 -> {
+                        if (f1.error() == null)
+                            res.onDone(f1.result());
+                        else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                            res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                        else
+                            res.onDone(f1.error());
+                    });
                 }
                 else {
                     if (f0.error() instanceof IgniteSnapshotVerifyException)
@@ -870,6 +909,37 @@
     }
 
     /**
+     * @param name Snapshot name.
+     * @return Future with snapshot metadata obtained from nodes.
+     */
+    IgniteInternalFuture<Map<ClusterNode, List<SnapshotMetadata>>> collectSnapshotMetadata(String name) {
+        GridKernalContext kctx0 = cctx.kernalContext();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+            (node) -> CU.baselineNode(node, kctx0.state().clusterState()));
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);
+
+        return kctx0.task().execute(SnapshotMetadataCollectorTask.class, name);
+    }
+
+    /**
+     * @param metas Nodes snapshot metadata.
+     * @return Future with the verification results.
+     */
+    IgniteInternalFuture<IdleVerifyResultV2> runSnapshotVerification(Map<ClusterNode, List<SnapshotMetadata>> metas) {
+        GridKernalContext kctx0 = cctx.kernalContext();
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet()));
+
+        return kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas);
+    }
+
+    /**
      * @param snpName Snapshot name.
      * @param folderName Directory name for cache group.
      * @return The list of cache or cache group names in given snapshot on local node.
@@ -1012,16 +1082,19 @@
                 if (localSnapshotNames().contains(name))
                     throw new IgniteException("Create snapshot request has been rejected. Snapshot with given name already exists on local node.");
 
+                if (isRestoring())
+                    throw new IgniteException("Snapshot operation has been rejected. Cache group restore operation is currently in progress.");
+
                 snpFut0 = new ClusterSnapshotFuture(UUID.randomUUID(), name);
 
                 clusterSnpFut = snpFut0;
                 lastSeenSnpFut = snpFut0;
             }
 
-            List<Integer> grps = cctx.cache().persistentGroups().stream()
+            List<String> grps = cctx.cache().persistentGroups().stream()
                 .filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == CacheType.USER)
                 .filter(g -> !g.config().isEncryptionEnabled())
-                .map(CacheGroupDescriptor::groupId)
+                .map(CacheGroupDescriptor::cacheOrGroupName)
                 .collect(Collectors.toList());
 
             List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
@@ -1039,7 +1112,8 @@
                 grps,
                 new HashSet<>(F.viewReadOnly(srvNodes,
                     F.node2id(),
-                    (node) -> CU.baselineNode(node, clusterState)))));
+                    (node) -> CU.baselineNode(node, clusterState)))
+            ));
 
             String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']';
 
@@ -1062,6 +1136,15 @@
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> restoreSnapshot(String name, @Nullable Collection<String> grpNames) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+        A.ensure(grpNames == null || !grpNames.isEmpty(), "List of cache group names cannot be empty.");
+
+        return restoreCacheGrpProc.start(name, grpNames);
+    }
+
+    /** {@inheritDoc} */
     @Override public void onReadyForReadWrite(ReadWriteMetastorage metaStorage) throws IgniteCheckedException {
         synchronized (snpOpMux) {
             this.metaStorage = metaStorage;
@@ -1075,6 +1158,8 @@
 
     /** {@inheritDoc} */
     @Override public void onReadyForRead(ReadOnlyMetastorage metaStorage) throws IgniteCheckedException {
+        restoreCacheGrpProc.cleanup();
+
         // Snapshot which has not been completed due to the local node crashed must be deleted.
         String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
 
@@ -1111,13 +1196,13 @@
 
         SnapshotOperationRequest snpReq = clusterSnpReq;
 
-        SnapshotFutureTask task = locSnpTasks.get(snpReq.snpName);
+        SnapshotFutureTask task = locSnpTasks.get(snpReq.snapshotName());
 
         if (task == null)
             return;
 
         if (task.start()) {
-            cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snpName));
+            cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snapshotName()));
 
             // Schedule task on a checkpoint and wait when it starts.
             try {
@@ -1305,7 +1390,7 @@
     /**
      * @return The executor used to run snapshot tasks.
      */
-    Executor snapshotExecutorService() {
+    ExecutorService snapshotExecutorService() {
         assert snpRunner != null;
 
         return snpRunner;
@@ -1319,6 +1404,13 @@
     }
 
     /**
+     * @return Factory to create IO interface over a page stores.
+     */
+    FileIOFactory ioFactory() {
+        return ioFactory;
+    }
+
+    /**
      * @return Relative configured path of persistence data storage directory for the local node.
      * Example: {@code snapshotWorkDir/db/IgniteNodeName0}
      */
@@ -1775,49 +1867,6 @@
         }
     }
 
-    /** Snapshot start request for {@link DistributedProcess} initiate message. */
-    private static class SnapshotOperationRequest implements Serializable {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0L;
-
-        /** Unique snapshot request id. */
-        private final UUID rqId;
-
-        /** Source node id which trigger request. */
-        private final UUID srcNodeId;
-
-        /** Snapshot name. */
-        private final String snpName;
-
-        /** The list of cache groups to include into snapshot. */
-        @GridToStringInclude
-        private final List<Integer> grpIds;
-
-        /** The list of affected by snapshot operation baseline nodes. */
-        @GridToStringInclude
-        private final Set<UUID> bltNodes;
-
-        /** Exception occurred during snapshot operation processing. */
-        private volatile IgniteCheckedException err;
-
-        /**
-         * @param snpName Snapshot name.
-         * @param grpIds Cache groups to include into snapshot.
-         */
-        public SnapshotOperationRequest(UUID rqId, UUID srcNodeId, String snpName, List<Integer> grpIds, Set<UUID> bltNodes) {
-            this.rqId = rqId;
-            this.srcNodeId = srcNodeId;
-            this.snpName = snpName;
-            this.grpIds = grpIds;
-            this.bltNodes = bltNodes;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SnapshotOperationRequest.class, this);
-        }
-    }
-
     /** */
     private static class SnapshotOperationResponse implements Serializable {
         /** Serial version uid. */
@@ -1858,18 +1907,18 @@
     }
 
     /** */
-    private static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
+    protected static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
         /** Unique snapshot request id. */
-        private final UUID rqId;
+        final UUID rqId;
 
         /** Snapshot name. */
-        private final String name;
+        final String name;
 
         /** Snapshot start time. */
-        private final long startTime;
+        final long startTime;
 
         /** Snapshot finish time. */
-        private volatile long endTime;
+        volatile long endTime;
 
         /**
          * Default constructor.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
new file mode 100644
index 0000000..177133f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Snapshot operation start request for {@link DistributedProcess} initiate message.
+ */
+public class SnapshotOperationRequest implements Serializable {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Request ID. */
+    private final UUID reqId;
+
+    /** Snapshot name. */
+    private final String snpName;
+
+    /** Baseline node IDs that must be alive to complete the operation. */
+    @GridToStringInclude
+    private final Set<UUID> nodes;
+
+    /** List of cache group names. */
+    @GridToStringInclude
+    private final Collection<String> grps;
+
+    /** Operational node ID. */
+    private final UUID opNodeId;
+
+    /** Exception occurred during snapshot operation processing. */
+    private volatile Throwable err;
+
+    /**
+     * @param reqId Request ID.
+     * @param opNodeId Operational node ID.
+     * @param snpName Snapshot name.
+     * @param grps List of cache group names.
+     * @param nodes Baseline node IDs that must be alive to complete the operation.
+     */
+    public SnapshotOperationRequest(
+        UUID reqId,
+        UUID opNodeId,
+        String snpName,
+        @Nullable Collection<String> grps,
+        Set<UUID> nodes
+    ) {
+        this.reqId = reqId;
+        this.opNodeId = opNodeId;
+        this.snpName = snpName;
+        this.grps = grps;
+        this.nodes = nodes;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public UUID requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return List of cache group names.
+     */
+    public @Nullable Collection<String> groups() {
+        return grps;
+    }
+
+    /**
+     * @return Baseline node IDs that must be alive to complete the operation.
+     */
+    public Set<UUID> nodes() {
+        return nodes;
+    }
+
+    /**
+     * @return Operational node ID.
+     */
+    public UUID operationalNodeId() {
+        return opNodeId;
+    }
+
+    /**
+     * @return Exception occurred during snapshot operation processing.
+     */
+    public Throwable error() {
+        return err;
+    }
+
+    /**
+     * @param err Exception occurred during snapshot operation processing.
+     */
+    public void error(Throwable err) {
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotOperationRequest.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
new file mode 100644
index 0000000..038561b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+
+/**
+ * Distributed process to restore cache group from the snapshot.
+ */
+public class SnapshotRestoreProcess {
+    /** Temporary cache directory prefix. */
+    public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";
+
+    /** Reject operation message. */
+    private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. ";
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Cache group restore prepare phase. */
+    private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
+
+    /** Cache group restore cache start phase. */
+    private final DistributedProcess<UUID, Boolean> cacheStartProc;
+
+    /** Cache group restore rollback phase. */
+    private final DistributedProcess<UUID, Boolean> rollbackRestoreProc;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Future to be completed when the cache restore process is complete (this future will be returned to the user). */
+    private volatile ClusterSnapshotFuture fut;
+
+    /** Snapshot restore operation context. */
+    private volatile SnapshotRestoreContext opCtx;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public SnapshotRestoreProcess(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        prepareRestoreProc = new DistributedProcess<>(
+            ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
+
+        cacheStartProc = new DistributedProcess<>(
+            ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
+
+        rollbackRestoreProc = new DistributedProcess<>(
+            ctx, RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK, this::rollback, this::finishRollback);
+    }
+
+    /**
+     * Cleanup temporary directories if any exists.
+     *
+     * @throws IgniteCheckedException If it was not possible to delete some temporary directory.
+     */
+    protected void cleanup() throws IgniteCheckedException {
+        FilePageStoreManager pageStore = (FilePageStoreManager)ctx.cache().context().pageStore();
+
+        File dbDir = pageStore.workDir();
+
+        for (File dir : dbDir.listFiles(dir -> dir.isDirectory() && dir.getName().startsWith(TMP_CACHE_DIR_PREFIX))) {
+            if (!U.delete(dir)) {
+                throw new IgniteCheckedException("Unable to remove temporary directory, " +
+                    "try deleting it manually [dir=" + dir + ']');
+            }
+        }
+    }
+
+    /**
+     * Start cache group restore operation.
+     *
+     * @param snpName Snapshot name.
+     * @param cacheGrpNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
+     * @return Future that will be completed when the restore operation is complete and the cache groups are started.
+     */
+    public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cacheGrpNames) {
+        ClusterSnapshotFuture fut0;
+
+        try {
+            if (ctx.clientNode())
+                throw new IgniteException(OP_REJECT_MSG + "Client and daemon nodes can not perform this operation.");
+
+            DiscoveryDataClusterState clusterState = ctx.state().clusterState();
+
+            if (clusterState.state() != ClusterState.ACTIVE || clusterState.transition())
+                throw new IgniteException(OP_REJECT_MSG + "The cluster should be active.");
+
+            if (!clusterState.hasBaselineTopology())
+                throw new IgniteException(OP_REJECT_MSG + "The baseline topology is not configured for cluster.");
+
+            if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), SNAPSHOT_RESTORE_CACHE_GROUP))
+                throw new IgniteException(OP_REJECT_MSG + "Not all nodes in the cluster support restore operation.");
+
+            if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+                throw new IgniteException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
+
+            synchronized (this) {
+                if (isRestoring() || fut != null)
+                    throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed.");
+
+                fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName);
+
+                fut0 = fut;
+            }
+        }
+        catch (IgniteException e) {
+            return new IgniteFinishedFutureImpl<>(e);
+        }
+
+        ctx.cache().context().snapshotMgr().collectSnapshotMetadata(snpName).listen(
+            f -> {
+                if (f.error() != null) {
+                    finishProcess(fut0.rqId, f.error());
+
+                    return;
+                }
+
+                Set<UUID> dataNodes = new HashSet<>();
+                Set<String> snpBltNodes = null;
+                Map<ClusterNode, List<SnapshotMetadata>> metas = f.result();
+                Map<Integer, String> reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() :
+                    cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v));
+
+                for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
+                    SnapshotMetadata meta = F.first(entry.getValue());
+
+                    assert meta != null : entry.getKey().id();
+
+                    if (!entry.getKey().consistentId().equals(meta.consistentId()))
+                        continue;
+
+                    if (snpBltNodes == null)
+                        snpBltNodes = new HashSet<>(meta.baselineNodes());
+
+                    dataNodes.add(entry.getKey().id());
+
+                    reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+                }
+
+                if (snpBltNodes == null) {
+                    finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " +
+                        "has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
+
+                    return;
+                }
+
+                if (!reqGrpIds.isEmpty()) {
+                    finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " +
+                        "found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
+
+                    return;
+                }
+
+                Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+                    node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
+
+                snpBltNodes.removeAll(bltNodes);
+
+                if (!snpBltNodes.isEmpty()) {
+                    finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
+                        "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
+
+                    return;
+                }
+
+                ctx.cache().context().snapshotMgr().runSnapshotVerification(metas).listen(
+                    f0 -> {
+                        if (f0.error() != null) {
+                            finishProcess(fut0.rqId, f0.error());
+
+                            return;
+                        }
+
+                        IdleVerifyResultV2 res = f0.result();
+
+                        if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) {
+                            StringBuilder sb = new StringBuilder();
+
+                            res.print(sb::append, true);
+
+                            finishProcess(fut0.rqId, new IgniteException(sb.toString()));
+
+                            return;
+                        }
+
+                        SnapshotOperationRequest req = new SnapshotOperationRequest(
+                            fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
+
+                        prepareRestoreProc.start(req.requestId(), req);
+                    }
+                );
+            }
+        );
+
+        return new IgniteFutureImpl<>(fut0);
+    }
+
+    /**
+     * Check if snapshot restore process is currently running.
+     *
+     * @return {@code True} if the snapshot restore operation is in progress.
+     */
+    public boolean isRestoring() {
+        return isRestoring(null, null);
+    }
+
+    /**
+     * Check if the cache or group with the specified name is currently being restored from the snapshot.
+     *
+     * @param cacheName Cache name.
+     * @param grpName Cache group name.
+     * @return {@code True} if the cache or group with the specified name is currently being restored.
+     */
+    public boolean isRestoring(@Nullable String cacheName, @Nullable String grpName) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (opCtx0 == null)
+            return false;
+
+        if (cacheName == null)
+            return true;
+
+        Map<Integer, StoredCacheData> cacheCfgs = opCtx0.cfgs;
+
+        int cacheId = CU.cacheId(cacheName);
+
+        if (cacheCfgs.containsKey(cacheId))
+            return true;
+
+        for (File grpDir : opCtx0.dirs) {
+            String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
+
+            if (grpName != null) {
+                if (cacheName.equals(locGrpName))
+                    return true;
+
+                if (CU.cacheId(locGrpName) == CU.cacheId(grpName))
+                    return true;
+            }
+            else if (CU.cacheId(locGrpName) == cacheId)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
+     *         starting the cache(s), the whole procedure is rolled back.
+     */
+    public Set<UUID> cacheStartRequiredAliveNodes(IgniteUuid reqId) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
+            return Collections.emptySet();
+
+        return Collections.unmodifiableSet(opCtx0.nodes);
+    }
+
+    /**
+     * Finish local cache group restore process.
+     *
+     * @param reqId Request ID.
+     */
+    private void finishProcess(UUID reqId) {
+        finishProcess(reqId, null);
+    }
+
+    /**
+     * Finish local cache group restore process.
+     *
+     * @param reqId Request ID.
+     * @param err Error, if any.
+     */
+    private void finishProcess(UUID reqId, @Nullable Throwable err) {
+        if (err != null)
+            log.error("Failed to restore snapshot cache group [reqId=" + reqId + ']', err);
+        else if (log.isInfoEnabled())
+            log.info("Successfully restored cache group(s) from the snapshot [reqId=" + reqId + ']');
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (opCtx0 != null && reqId.equals(opCtx0.reqId))
+            opCtx = null;
+
+        synchronized (this) {
+            ClusterSnapshotFuture fut0 = fut;
+
+            if (fut0 != null && reqId.equals(fut0.rqId)) {
+                fut = null;
+
+                ctx.getSystemExecutorService().submit(() -> fut0.onDone(null, err));
+            }
+        }
+    }
+
+    /**
+     * Node left callback.
+     *
+     * @param leftNodeId Left node ID.
+     */
+    public void onNodeLeft(UUID leftNodeId) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) {
+            opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
+                "Required node has left the cluster [nodeId=" + leftNodeId + ']'));
+        }
+    }
+
+    /**
+     * Abort the currently running restore procedure (if any).
+     *
+     * @param reason Interruption reason.
+     */
+    public void interrupt(Exception reason) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (opCtx0 == null)
+            return;
+
+        opCtx0.err.compareAndSet(null, reason);
+
+        IgniteFuture<?> stopFut;
+
+        synchronized (this) {
+            stopFut = opCtx0.stopFut;
+        }
+
+        if (stopFut != null)
+            stopFut.get();
+    }
+
+    /**
+     * Ensures that a cache with the specified name does not exist locally.
+     *
+     * @param name Cache name.
+     */
+    private void ensureCacheAbsent(String name) {
+        int id = CU.cacheId(name);
+
+        if (ctx.cache().cacheGroupDescriptors().containsKey(id) || ctx.cache().cacheDescriptor(id) != null) {
+            throw new IgniteIllegalStateException("Cache \"" + name +
+                "\" should be destroyed manually before perform restore operation.");
+        }
+    }
+
+    /**
+     * @param req Request to prepare cache group restore from the snapshot.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<ArrayList<StoredCacheData>> prepare(SnapshotOperationRequest req) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        try {
+            DiscoveryDataClusterState state = ctx.state().clusterState();
+
+            if (state.state() != ClusterState.ACTIVE || state.transition())
+                throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
+
+            if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+                throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
+
+            for (UUID nodeId : req.nodes()) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null || !CU.baselineNode(node, state) || !ctx.discovery().alive(node)) {
+                    throw new IgniteCheckedException(
+                        OP_REJECT_MSG + "Required node has left the cluster [nodeId-" + nodeId + ']');
+                }
+            }
+
+            opCtx = prepareContext(req);
+
+            SnapshotRestoreContext opCtx0 = opCtx;
+
+            if (opCtx0.dirs.isEmpty())
+                return new GridFinishedFuture<>();
+
+            // Ensure that shared cache groups has no conflicts.
+            for (StoredCacheData cfg : opCtx0.cfgs.values()) {
+                ensureCacheAbsent(cfg.config().getName());
+
+                if (!F.isEmpty(cfg.config().getGroupName()))
+                    ensureCacheAbsent(cfg.config().getGroupName());
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting local snapshot restore operation" +
+                    " [reqId=" + req.requestId() +
+                    ", snapshot=" + req.snapshotName() +
+                    ", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']');
+            }
+
+            Consumer<Throwable> errHnd = (ex) -> opCtx.err.compareAndSet(null, ex);
+            BooleanSupplier stopChecker = () -> opCtx.err.get() != null;
+            GridFutureAdapter<ArrayList<StoredCacheData>> retFut = new GridFutureAdapter<>();
+
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping.");
+
+            opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+
+            restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd)
+                .thenAccept(res -> {
+                    try {
+                        Throwable err = opCtx.err.get();
+
+                        if (err != null)
+                            throw err;
+
+                        for (File src : opCtx0.dirs)
+                            Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
+                    }
+                    catch (Throwable t) {
+                        log.error("Unable to restore cache group(s) from the snapshot " +
+                            "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t);
+
+                        retFut.onDone(t);
+
+                        return;
+                    }
+
+                    retFut.onDone(new ArrayList<>(opCtx.cfgs.values()));
+                });
+
+            return retFut;
+        }
+        catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
+            log.error("Unable to restore cache group(s) from the snapshot " +
+                "[reqId=" + req.requestId() + ", snapshot=" + req.snapshotName() + ']', e);
+
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /**
+     * @param cacheDir Cache directory.
+     * @return Temporary directory.
+     */
+    private File formatTmpDirName(File cacheDir) {
+        return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
+    }
+
+    /**
+     * Copy partition files and update binary metadata.
+     *
+     * @param snpName Snapshot name.
+     * @param dirs Cache directories to restore from the snapshot.
+     * @param updateMeta Update binary metadata flag.
+     * @param stopChecker Process interrupt checker.
+     * @param errHnd Error handler.
+     * @throws IgniteCheckedException If failed.
+     */
+    private CompletableFuture<Void> restoreAsync(
+        String snpName,
+        Collection<File> dirs,
+        boolean updateMeta,
+        BooleanSupplier stopChecker,
+        Consumer<Throwable> errHnd
+    ) throws IgniteCheckedException {
+        IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
+        String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
+
+        List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+        if (updateMeta) {
+            File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName);
+
+            futs.add(CompletableFuture.runAsync(() -> {
+                try {
+                    ctx.cacheObjects().updateMetadata(binDir, stopChecker);
+                }
+                catch (Throwable t) {
+                    errHnd.accept(t);
+                }
+            }, snapshotMgr.snapshotExecutorService()));
+        }
+
+        for (File cacheDir : dirs) {
+            File tmpCacheDir = formatTmpDirName(cacheDir);
+            File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName),
+                Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
+
+            assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir;
+
+            for (File snpFile : snpCacheDir.listFiles()) {
+                futs.add(CompletableFuture.runAsync(() -> {
+                    if (stopChecker.getAsBoolean())
+                        return;
+
+                    try {
+                        if (Thread.interrupted())
+                            throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+
+                        File target = new File(tmpCacheDir, snpFile.getName());
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Copying file from the snapshot " +
+                                "[snapshot=" + snpName +
+                                ", src=" + snpFile +
+                                ", target=" + target + "]");
+                        }
+
+                        IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length());
+                    }
+                    catch (Throwable t) {
+                        errHnd.accept(t);
+                    }
+                }, ctx.cache().context().snapshotMgr().snapshotExecutorService()));
+            }
+        }
+
+        int futsSize = futs.size();
+
+        return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]));
+    }
+
+    /**
+     * @param req Request to prepare cache group restore from the snapshot.
+     * @return Snapshot restore operation context.
+     * @throws IgniteCheckedException If failed.
+     */
+    private SnapshotRestoreContext prepareContext(SnapshotOperationRequest req) throws IgniteCheckedException {
+        if (opCtx != null) {
+            throw new IgniteCheckedException(OP_REJECT_MSG +
+                "The previous snapshot restore operation was not completed.");
+        }
+
+        GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
+
+        SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName()));
+
+        if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString()))
+            return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap());
+
+        if (meta.pageSize() != cctx.database().pageSize()) {
+            throw new IgniteCheckedException("Incompatible memory page size " +
+                "[snapshotPageSize=" + meta.pageSize() +
+                ", local=" + cctx.database().pageSize() +
+                ", snapshot=" + req.snapshotName() +
+                ", nodeId=" + cctx.localNodeId() + ']');
+        }
+
+        List<File> cacheDirs = new ArrayList<>();
+        Map<String, StoredCacheData> cfgsByName = new HashMap<>();
+        FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
+
+        // Collect the cache configurations and prepare a temporary directory for copying files.
+        for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName())) {
+            String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
+
+            if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
+                continue;
+
+            File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
+
+            if (cacheDir.exists()) {
+                if (!cacheDir.isDirectory()) {
+                    throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
+                        "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
+                }
+
+                if (cacheDir.list().length > 0) {
+                    throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
+                        "[group=" + grpName + ", dir=" + cacheDir + ']');
+                }
+
+                if (!cacheDir.delete()) {
+                    throw new IgniteCheckedException("Unable to remove empty cache directory " +
+                        "[group=" + grpName + ", dir=" + cacheDir + ']');
+                }
+            }
+
+            File tmpCacheDir = formatTmpDirName(cacheDir);
+
+            if (tmpCacheDir.exists()) {
+                throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
+                    "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+            }
+
+            if (!tmpCacheDir.mkdir()) {
+                throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " +
+                    "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+            }
+
+            cacheDirs.add(cacheDir);
+
+            pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
+        }
+
+        Map<Integer, StoredCacheData> cfgsById =
+            cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
+
+        return new SnapshotRestoreContext(req, cacheDirs, cfgsById);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = F.first(errs.values());
+
+        assert opCtx0 != null || failure != null : "Context has not been created on the node " + ctx.localNodeId();
+
+        if (opCtx0 == null || !reqId.equals(opCtx0.reqId)) {
+            finishProcess(reqId, failure);
+
+            return;
+        }
+
+        if (failure == null)
+            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+
+        // Context has been created - should rollback changes cluster-wide.
+        if (failure != null) {
+            opCtx0.err.compareAndSet(null, failure);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
+
+        for (List<StoredCacheData> storedCfgs : res.values()) {
+            if (storedCfgs == null)
+                continue;
+
+            for (StoredCacheData cacheData : storedCfgs)
+                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+        }
+
+        opCtx0.cfgs = globalCfgs;
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            cacheStartProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Throwable err = opCtx0.err.get();
+
+        if (err != null)
+            return new GridFinishedFuture<>(err);
+
+        if (!U.isLocalNodeCoordinator(ctx.discovery()))
+            return new GridFinishedFuture<>();
+
+        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting restored caches " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+        }
+
+        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
+        // the cluster during the cache startup, the whole procedure will be rolled back.
+        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        if (failure == null) {
+            finishProcess(reqId);
+
+            return;
+        }
+
+        opCtx0.err.compareAndSet(null, failure);
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            rollbackRestoreProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param reqNodes Set of required topology nodes.
+     * @param respNodes Set of responding topology nodes.
+     * @return Error, if no response was received from the required topology node.
+     */
+    private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
+        if (!respNodes.containsAll(reqNodes)) {
+            Set<UUID> leftNodes = new HashSet<>(reqNodes);
+
+            leftNodes.removeAll(respNodes);
+
+            return new ClusterTopologyCheckedException(OP_REJECT_MSG +
+                "Required node has left the cluster [nodeId=" + leftNodes + ']');
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (F.isEmpty(opCtx0.dirs))
+            return new GridFinishedFuture<>();
+
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        synchronized (this) {
+            opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+
+            try {
+                ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
+                    if (log.isInfoEnabled()) {
+                        log.info("Removing restored cache directories [reqId=" + reqId +
+                            ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
+                    }
+
+                    IgniteCheckedException ex = null;
+
+                    for (File cacheDir : opCtx0.dirs) {
+                        File tmpCacheDir = formatTmpDirName(cacheDir);
+
+                        if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
+                            log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
+                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+
+                            ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
+                        }
+
+                        if (cacheDir.exists() && !U.delete(cacheDir)) {
+                            log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
+                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+
+                            ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
+                        }
+                    }
+
+                    if (ex != null)
+                        retFut.onDone(ex);
+                    else
+                        retFut.onDone(true);
+                });
+            }
+            catch (RejectedExecutionException e) {
+                log.error("Unable to perform rollback routine, task has been rejected " +
+                    "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+
+                retFut.onDone(e);
+            }
+        }
+
+        return retFut;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        if (!errs.isEmpty()) {
+            log.warning("Some nodes were unable to complete the rollback routine completely, check the local log " +
+                "files for more information [nodeIds=" + errs.keySet() + ']');
+        }
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (!res.keySet().containsAll(opCtx0.nodes)) {
+            Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes);
+
+            leftNodes.removeAll(res.keySet());
+
+            log.warning("Some of the nodes left the cluster and were unable to complete the rollback" +
+                " operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
+        }
+
+        finishProcess(reqId, opCtx0.err.get());
+    }
+
+    /**
+     * Cache group restore from snapshot operation context.
+     */
+    private static class SnapshotRestoreContext {
+        /** Request ID. */
+        private final UUID reqId;
+
+        /** Snapshot name. */
+        private final String snpName;
+
+        /** Baseline node IDs that must be alive to complete the operation. */
+        private final Set<UUID> nodes;
+
+        /** List of restored cache group directories. */
+        private final Collection<File> dirs;
+
+        /** The exception that led to the interruption of the process. */
+        private final AtomicReference<Throwable> err = new AtomicReference<>();
+
+        /** Cache ID to configuration mapping. */
+        private volatile Map<Integer, StoredCacheData> cfgs;
+
+        /** Graceful shutdown future. */
+        private volatile IgniteFuture<?> stopFut;
+
+        /**
+         * @param req Request to prepare cache group restore from the snapshot.
+         * @param dirs List of cache group names to restore from the snapshot.
+         * @param cfgs Cache ID to configuration mapping.
+         */
+        protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
+            Map<Integer, StoredCacheData> cfgs) {
+            reqId = req.requestId();
+            snpName = req.snapshotName();
+            nodes = new HashSet<>(req.nodes());
+
+            this.dirs = dirs;
+            this.cfgs = cfgs;
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 7ccfee0..170a3c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
+import java.util.function.BooleanSupplier;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -307,6 +308,15 @@
     public void saveMetadata(Collection<BinaryType> types, File dir);
 
     /**
+     * Merge the binary metadata files stored in the specified directory.
+     *
+     * @param metadataDir Directory containing binary metadata files.
+     * @param stopChecker Process interrupt checker.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void updateMetadata(File metadataDir, BooleanSupplier stopChecker) throws IgniteCheckedException;
+
+    /**
      * @param typeName Type name.
      * @param ord ordinal.
      * @return Enum object.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 7d9e6ae..730b9e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -445,6 +445,21 @@
         /**
          * Rotate performance statistics.
          */
-        PERFORMANCE_STATISTICS_ROTATE
+        PERFORMANCE_STATISTICS_ROTATE,
+
+        /**
+         * Cache group restore prepare phase.
+         */
+        RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+
+        /**
+         * Cache group restore cache start phase.
+         */
+        RESTORE_CACHE_GROUP_SNAPSHOT_START,
+
+        /**
+         * Cache group restore rollback phase.
+         */
+        RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 74976b1..b873f7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -111,7 +111,7 @@
     protected final List<Integer> locEvts = new CopyOnWriteArrayList<>();
 
     /** Configuration for the 'default' cache. */
-    protected volatile CacheConfiguration<Integer, Integer> dfltCacheCfg;
+    protected volatile CacheConfiguration<Integer, Object> dfltCacheCfg;
 
     /** Enable default data region persistence. */
     protected boolean persistence = true;
@@ -124,6 +124,9 @@
 
         discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder());
 
+        if (dfltCacheCfg != null)
+            cfg.setCacheConfiguration(dfltCacheCfg);
+
         return cfg.setConsistentId(igniteInstanceName)
             .setCommunicationSpi(new TestRecordingCommunicationSpi())
             .setDataStorageConfiguration(new DataStorageConfiguration()
@@ -132,7 +135,6 @@
                     .setPersistenceEnabled(persistence))
                 .setCheckpointFrequency(3000)
                 .setPageSize(DFLT_PAGE_SIZE))
-            .setCacheConfiguration(dfltCacheCfg)
             .setClusterStateOnStart(INACTIVE)
             .setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT)
             .setDiscoverySpi(discoSpi);
@@ -185,7 +187,7 @@
      * @param ccfg Default cache configuration.
      * @return Cache configuration.
      */
-    protected static <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
+    protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
         return ccfg.setCacheMode(CacheMode.PARTITIONED)
             .setBackups(2)
             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
@@ -239,7 +241,7 @@
      * @return Ignite instance.
      * @throws Exception If fails.
      */
-    protected IgniteEx startGridWithCache(CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+    protected IgniteEx startGridWithCache(CacheConfiguration<Integer, Object> ccfg, int keys) throws Exception {
         return startGridsWithCache(1, ccfg, keys);
     }
 
@@ -250,7 +252,7 @@
      * @return Ignite instance.
      * @throws Exception If fails.
      */
-    protected IgniteEx startGridsWithCache(int grids, CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+    protected IgniteEx startGridsWithCache(int grids, CacheConfiguration<Integer, Object> ccfg, int keys) throws Exception {
         dfltCacheCfg = ccfg;
 
         return startGridsWithCache(grids, keys, Integer::new, ccfg);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
new file mode 100644
index 0000000..62f4619
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.function.Function;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.internal.IgniteEx;
+
+/**
+ * Snapshot restore test base.
+ */
+public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnapshotSelfTest {
+    /** Timeout. */
+    protected static final long TIMEOUT = 15_000;
+
+    /** Cache value builder. */
+    protected abstract Function<Integer, Object> valueBuilder();
+
+    /**
+     * @param nodesCnt Nodes count.
+     * @param keysCnt Number of keys to create.
+     * @return Ignite coordinator instance.
+     * @throws Exception if failed.
+     */
+    protected IgniteEx startGridsWithSnapshot(int nodesCnt, int keysCnt) throws Exception {
+        return startGridsWithSnapshot(nodesCnt, keysCnt, false);
+    }
+
+    /**
+     * @param nodesCnt Nodes count.
+     * @param keysCnt Number of keys to create.
+     * @param startClient {@code True} to start an additional client node.
+     * @return Ignite coordinator instance.
+     * @throws Exception if failed.
+     */
+    protected IgniteEx startGridsWithSnapshot(int nodesCnt, int keysCnt, boolean startClient) throws Exception {
+        IgniteEx ignite = startGridsWithCache(nodesCnt, keysCnt, valueBuilder(), dfltCacheCfg);
+
+        if (startClient)
+            ignite = startClientGrid("client");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        ignite.cache(dfltCacheCfg.getName()).destroy();
+
+        awaitPartitionMapExchange();
+
+        return ignite;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keysCnt Expected number of keys.
+     */
+    protected void assertCacheKeys(IgniteCache<Object, Object> cache, int keysCnt) {
+        assertEquals(keysCnt, cache.size());
+
+        for (int i = 0; i < keysCnt; i++)
+            assertEquals(valueBuilder().apply(i), cache.get(i));
+    }
+
+    /** */
+    protected class BinaryValueBuilder implements Function<Integer, Object> {
+        /** Binary type name. */
+        private final String typeName;
+
+        /**
+         * @param typeName Binary type name.
+         */
+        BinaryValueBuilder(String typeName) {
+            this.typeName = typeName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object apply(Integer key) {
+            BinaryObjectBuilder builder = grid(0).binary().builder(typeName);
+
+            builder.setField("id", key);
+            builder.setField("name", String.valueOf(key));
+
+            return builder.build();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
new file mode 100644
index 0000000..f3359f1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * Snapshot restore tests.
+ */
+public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** Type name used for binary and SQL. */
+    private static final String TYPE_NAME = "CustomType";
+
+    /** Cache 1 name. */
+    private static final String CACHE1 = "cache1";
+
+    /** Cache 2 name. */
+    private static final String CACHE2 = "cache2";
+
+    /** Default shared cache group name. */
+    private static final String SHARED_GRP = "shared";
+
+    /** Cache value builder. */
+    private Function<Integer, Object> valBuilder = String::valueOf;
+
+    /** {@inheritDoc} */
+    @Override protected Function<Integer, Object> valueBuilder() {
+        return valBuilder;
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreAllGroups() throws Exception {
+        CacheConfiguration<Integer, Object> cacheCfg1 =
+            txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+        CacheConfiguration<Integer, Object> cacheCfg2 =
+            txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder,
+            dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        ignite.cache(CACHE1).destroy();
+        ignite.cache(CACHE2).destroy();
+        ignite.cache(DEFAULT_CACHE_NAME).destroy();
+
+        awaitPartitionMapExchange();
+
+        // Restore all cache groups.
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
+        assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testStartClusterSnapshotRestoreMultipleThreadsSameNode() throws Exception {
+        checkStartClusterSnapshotRestoreMultithreaded(() -> 0);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testStartClusterSnapshotRestoreMultipleThreadsDiffNode() throws Exception {
+        AtomicInteger nodeIdx = new AtomicInteger();
+
+        checkStartClusterSnapshotRestoreMultithreaded(nodeIdx::getAndIncrement);
+    }
+
+    /**
+     * @param nodeIdxSupplier Ignite node index supplier.
+     */
+    private void checkStartClusterSnapshotRestoreMultithreaded(IntSupplier nodeIdxSupplier) throws Exception {
+        Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+        AtomicInteger successCnt = new AtomicInteger();
+        AtomicInteger failCnt = new AtomicInteger();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                nodeIdxSupplier.getAsInt();
+
+                grid(nodeIdxSupplier.getAsInt()).snapshot().restoreSnapshot(
+                    SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+                successCnt.incrementAndGet();
+            }
+            catch (Exception e) {
+                failCnt.incrementAndGet();
+            }
+        }, 2, "runner");
+
+        fut.get(TIMEOUT);
+
+        assertEquals(1, successCnt.get());
+        assertEquals(1, failCnt.get());
+
+        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCreateSnapshotDuringRestore() throws Exception {
+        Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+        BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0));
+
+        discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch);
+
+        IgniteFuture<Void> fut =
+            ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        discoSpi.waitBlocked(TIMEOUT);
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> grid(1).snapshot().createSnapshot("NEW_SNAPSHOT").get(TIMEOUT),
+            IgniteException.class,
+            "Cache group restore operation is currently in progress."
+        );
+
+        discoSpi.unblock();
+
+        fut.get(TIMEOUT);
+
+        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+    }
+
+    /**
+     * Ensures that the cache doesn't start if one of the baseline nodes fails.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNodeLeftDuringCacheStartOnExchangeInit() throws Exception {
+        startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true);
+
+        BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0));
+
+        discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch);
+
+        IgniteFuture<Void> fut =
+            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        discoSpi.waitBlocked(TIMEOUT);
+
+        stopGrid(2, true);
+
+        discoSpi.unblock();
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+
+        ensureCacheAbsent(dfltCacheCfg);
+    }
+
+    /**
+     * Ensures that the cache is not started if non-coordinator node left during the exchange.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNodeLeftDuringCacheStartOnExchangeFinish() throws Exception {
+        checkNodeLeftOnExchangeFinish(
+            false, ClusterTopologyCheckedException.class, "Required node has left the cluster");
+    }
+
+    /**
+     * Ensures that the cache is not started if the coordinator left during the exchange.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCrdLeftDuringCacheStartOnExchangeFinish() throws Exception {
+        checkNodeLeftOnExchangeFinish(
+            true, IgniteCheckedException.class, "Operation has been cancelled (node is stopping)");
+    }
+
+    /**
+     * @param crdStop {@code True} to stop coordinator node.
+     * @param expCls Expected exception class.
+     * @param expMsg Expected exception message.
+     * @throws Exception If failed.
+     */
+    private void checkNodeLeftOnExchangeFinish(
+        boolean crdStop,
+        Class<? extends Throwable> expCls,
+        String expMsg
+    ) throws Exception {
+        startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true);
+
+        TestRecordingCommunicationSpi node1spi = TestRecordingCommunicationSpi.spi(grid(1));
+        TestRecordingCommunicationSpi node2spi = TestRecordingCommunicationSpi.spi(grid(2));
+
+        node1spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+        node2spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+
+        IgniteFuture<Void> fut =
+            grid(1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        node1spi.waitForBlocked();
+        node2spi.waitForBlocked();
+
+        stopGrid(crdStop ? 0 : 2, true);
+
+        node1spi.stopBlock();
+
+        if (crdStop)
+            node2spi.stopBlock();
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), expCls, expMsg);
+
+        awaitPartitionMapExchange();
+
+        ensureCacheAbsent(dfltCacheCfg);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testClusterSnapshotRestoreRejectOnInActiveCluster() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        ignite.cluster().state(ClusterState.INACTIVE);
+
+        IgniteFuture<Void> fut =
+            ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        GridTestUtils.assertThrowsAnyCause(
+            log, () -> fut.get(TIMEOUT), IgniteException.class, "The cluster should be active");
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testClusterSnapshotRestoreOnSmallerTopology() throws Exception {
+        startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true);
+
+        stopGrid(1);
+
+        resetBaselineTopology();
+
+        IgniteFuture<Void> fut =
+            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
+
+        ensureCacheAbsent(dfltCacheCfg);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreSharedCacheGroup() throws Exception {
+        CacheConfiguration<Integer, Object> cacheCfg1 =
+            txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+        CacheConfiguration<Integer, Object> cacheCfg2 =
+            txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, cacheCfg1, cacheCfg2);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        ignite.cache(CACHE1).destroy();
+
+        awaitPartitionMapExchange();
+
+        IgniteSnapshot snp = ignite.snapshot();
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> snp.restoreSnapshot(SNAPSHOT_NAME, Arrays.asList(CACHE1, CACHE2)).get(TIMEOUT),
+            IllegalArgumentException.class,
+            "Cache group(s) was not found in the snapshot"
+        );
+
+        ignite.cache(CACHE2).destroy();
+
+        awaitPartitionMapExchange();
+
+        snp.restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(SHARED_GRP)).get(TIMEOUT);
+
+        assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
+        assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testIncompatibleMetasUpdate() throws Exception {
+        valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
+        IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+        int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
+
+        ignite.context().cacheObjects().removeType(typeId);
+
+        BinaryObject[] objs = new BinaryObject[CACHE_KEYS_RANGE];
+
+        IgniteCache<Integer, Object> cache1 = createCacheWithBinaryType(ignite, "cache1", n -> {
+            BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME);
+
+            builder.setField("id", n);
+
+            objs[n] = builder.build();
+
+            return objs[n];
+        });
+
+        ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+        // Ensure that existing type has been updated.
+        BinaryType type = ignite.context().cacheObjects().metadata(typeId);
+
+        assertTrue(type.fieldNames().contains("name"));
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            assertEquals(objs[i], cache1.get(i));
+
+        cache1.destroy();
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        ignite.context().cacheObjects().removeType(typeId);
+
+        // Create cache with incompatible binary type.
+        cache1 = createCacheWithBinaryType(ignite, "cache1", n -> {
+            BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME);
+
+            builder.setField("id", UUID.randomUUID());
+
+            objs[n] = builder.build();
+
+            return objs[n];
+        });
+
+        IgniteFuture<Void> fut0 =
+            ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), BinaryObjectException.class, null);
+
+        ensureCacheAbsent(dfltCacheCfg);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            assertEquals(objs[i], cache1.get(i));
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param cacheName Cache name.
+     * @param valBuilder Binary value builder.
+     * @return Created cache.
+     */
+    private IgniteCache<Integer, Object> createCacheWithBinaryType(
+        Ignite ignite,
+        String cacheName,
+        Function<Integer, BinaryObject> valBuilder
+    ) {
+        IgniteCache<Integer, Object> cache = ignite.createCache(new CacheConfiguration<>(cacheName)).withKeepBinary();
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            cache.put(i, valBuilder.apply(i));
+
+        return cache;
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    @Test
+    public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception {
+        checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class,
+            "Cache start failed. A cache or group with the same name is currently being restored from a snapshot");
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    @Test
+    public void testParallelCacheStartWithTheSameNameOnStart() throws Exception {
+        checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_START, CacheExistsException.class,
+            "Failed to start cache (a cache with the same name is already started):");
+    }
+
+    /**
+     * @param procType The type of distributed process on which communication is blocked.
+     * @throws Exception if failed.
+     */
+    private void checkCacheStartWithTheSameName(
+        DistributedProcessType procType,
+        Class<? extends Throwable> expCls,
+        String expMsg
+    ) throws Exception {
+        dfltCacheCfg = txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+        IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
+
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, SHARED_GRP);
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(SHARED_GRP), IgniteCheckedException.class, null);
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(CACHE1), expCls, expMsg);
+
+        spi.stopBlock();
+
+        fut.get(TIMEOUT);
+
+        assertCacheKeys(grid(0).cache(CACHE1), CACHE_KEYS_RANGE);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testNodeFailDuringRestore() throws Exception {
+        startGridsWithSnapshot(4, CACHE_KEYS_RANGE);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3));
+
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+
+        IgniteInternalFuture<?> fut0 = runAsync(() -> stopGrid(3, true));
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> fut.get(TIMEOUT),
+            ClusterTopologyCheckedException.class,
+            "Required node has left the cluster"
+        );
+
+        fut0.get(TIMEOUT);
+
+        awaitPartitionMapExchange();
+
+        ensureCacheAbsent(dfltCacheCfg);
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> startGrid(3),
+            IgniteSpiException.class,
+            "to add the node to cluster - remove directories with the caches"
+        );
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testNodeFailDuringFilesCopy() throws Exception {
+        dfltCacheCfg.setCacheMode(CacheMode.REPLICATED);
+
+        startGridsWithSnapshot(3, CACHE_KEYS_RANGE);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(2));
+        CountDownLatch stopLatch = new CountDownLatch(1);
+
+        spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage &&
+            ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
+
+        String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME,
+            PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString();
+
+        grid(2).context().cache().context().snapshotMgr().ioFactory(
+            new CustomFileIOFactory(new RandomAccessFileIOFactory(),
+                file -> {
+                    if (file.getPath().endsWith(failingFilePath)) {
+                        stopLatch.countDown();
+
+                        throw new RuntimeException("Test exception");
+                    }
+                }));
+
+        File node2dbDir = ((FilePageStoreManager)grid(2).context().cache().context().pageStore()).
+            cacheWorkDir(dfltCacheCfg).getParentFile();
+
+        IgniteInternalFuture<Object> stopFut = runAsync(() -> {
+            U.await(stopLatch, TIMEOUT, TimeUnit.MILLISECONDS);
+
+            stopGrid(2, true);
+
+            return null;
+        });
+
+        IgniteFuture<Void> fut =
+            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+        stopFut.get(TIMEOUT);
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+
+        File[] files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
+        assertEquals("A temp directory with potentially corrupted files must exist.", 1, files.length);
+
+        ensureCacheAbsent(dfltCacheCfg);
+
+        dfltCacheCfg = null;
+
+        startGrid(2);
+
+        files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
+        assertEquals("A temp directory should be removed at node startup", 0, files.length);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testNodeJoinDuringRestore() throws Exception {
+        Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
+
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> startGrid(2),
+            IgniteSpiException.class,
+            "Joining node during caches restore is not allowed"
+        );
+
+        spi.stopBlock();
+
+        fut.get(TIMEOUT);
+
+        IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        assertTrue(cache.indexReadyFuture().isDone());
+
+        assertCacheKeys(cache, CACHE_KEYS_RANGE);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testClusterStateChangeActiveReadonlyOnPrepare() throws Exception {
+        checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+            IgniteException.class, "Failed to perform start cache operation (cluster is in read-only mode)");
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testClusterStateChangeActiveReadonlyOnCacheStart() throws Exception {
+        checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testClusterDeactivateOnPrepare() throws Exception {
+        checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+            IgniteException.class, "The cluster has been deactivated.");
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testClusterDeactivateOnCacheStart() throws Exception {
+        checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null);
+    }
+
+    /**
+     * @param state Cluster state.
+     * @param procType The type of distributed process on which communication is blocked.
+     * @param exCls Expected exception class.
+     * @param expMsg Expected exception message.
+     * @throws Exception if failed.
+     */
+    private void checkClusterStateChange(
+        ClusterState state,
+        DistributedProcessType procType,
+        @Nullable Class<? extends Throwable> exCls,
+        @Nullable String expMsg
+    ) throws Exception {
+        int nodesCnt = 2;
+
+        Ignite ignite = startGridsWithSnapshot(nodesCnt, CACHE_KEYS_RANGE, true);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(nodesCnt - 1));
+
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, DEFAULT_CACHE_NAME);
+
+        ignite.cluster().state(state);
+
+        spi.stopBlock();
+
+        if (exCls == null) {
+            fut.get(TIMEOUT);
+
+            ignite.cluster().state(ClusterState.ACTIVE);
+
+            assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+
+            return;
+        }
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), exCls, expMsg);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        ensureCacheAbsent(dfltCacheCfg);
+
+        String cacheName = DEFAULT_CACHE_NAME;
+
+        grid(nodesCnt - 1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(cacheName)).get(TIMEOUT);
+
+        assertCacheKeys(ignite.cache(cacheName), CACHE_KEYS_RANGE);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void ensureCacheAbsent(CacheConfiguration<?, ?> ccfg) throws IgniteCheckedException {
+        String cacheName = ccfg.getName();
+
+        for (Ignite ignite : G.allGrids()) {
+            GridKernalContext kctx = ((IgniteEx)ignite).context();
+
+            if (kctx.clientNode())
+                continue;
+
+            CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName));
+
+            assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc);
+
+            GridTestUtils.waitForCondition(
+                () -> !kctx.cache().context().snapshotMgr().isRestoring(),
+                TIMEOUT);
+
+            File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg);
+
+            String errMsg = String.format("%s, dir=%s, exists=%b, files=%s",
+                ignite.name(), dir, dir.exists(), Arrays.toString(dir.list()));
+
+            assertTrue(errMsg, !dir.exists() || dir.list().length == 0);
+        }
+    }
+
+    /**
+     * @param spi Test communication spi.
+     * @param restorePhase The type of distributed process on which communication is blocked.
+     * @param grpName Cache group name.
+     * @return Snapshot restore future.
+     * @throws InterruptedException if interrupted.
+     */
+    private IgniteFuture<Void> waitForBlockOnRestore(
+        TestRecordingCommunicationSpi spi,
+        DistributedProcessType restorePhase,
+        String grpName
+    ) throws InterruptedException {
+        spi.blockMessages((node, msg) ->
+            msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
+
+        IgniteFuture<Void> fut =
+            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
+
+        spi.waitForBlocked();
+
+        return fut;
+    }
+
+    /**
+     * Custom I/O factory to preprocessing created files.
+     */
+    private static class CustomFileIOFactory implements FileIOFactory {
+        /** Serial version UID. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private final FileIOFactory delegate;
+
+        /** Preprocessor for created files. */
+        private final Consumer<File> hnd;
+
+        /**
+         * @param delegate Delegate factory.
+         * @param hnd Preprocessor for created files.
+         */
+        public CustomFileIOFactory(FileIOFactory delegate, Consumer<File> hnd) {
+            this.delegate = delegate;
+            this.hnd = hnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            FileIO delegate = this.delegate.create(file, modes);
+
+            hnd.accept(file);
+
+            return delegate;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 14cd9d7..fbbe62e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -41,6 +41,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.CommonPoolStarvationCheckpointTest;
 import org.apache.ignite.internal.processors.cache.persistence.SingleNodePersistenceSslTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
@@ -99,6 +100,7 @@
     IgniteClusterSnapshotSelfTest.class,
     IgniteClusterSnapshotCheckTest.class,
     IgniteSnapshotMXBeanTest.class,
+    IgniteClusterSnapshotRestoreSelfTest.class,
 
     IgniteClusterIdTagTest.class,
     FullyConnectedComponentSearcherTest.class,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
index e0aee42e..7f4165d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
@@ -119,7 +119,7 @@
      * @param cacheName Cache name.
      * @return Cache configuration.
      */
-    private static CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) {
+    private CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) {
         return txCacheConfig(new CacheConfiguration<Integer, Account>(cacheName))
             .setCacheMode(CacheMode.REPLICATED)
             .setQueryEntities(singletonList(new QueryEntity(Integer.class.getName(), Account.class.getName())));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
new file mode 100644
index 0000000..17f495c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryBasicNameMapper;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.junit.Test;
+
+/**
+ * Cluster snapshot restore tests verifying SQL and indexing.
+ */
+public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** Type name used for binary and SQL. */
+    private static final String TYPE_NAME = IndexedObject.class.getName();
+
+    /** Number of cache keys to pre-create at node start. */
+    private static final int CACHE_KEYS_RANGE = 10_000;
+
+    /** Cache value builder. */
+    private Function<Integer, Object> valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
+    /** {@inheritDoc} */
+    @Override protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
+        return super.txCacheConfig(ccfg).setSqlIndexMaxInlineSize(255).setSqlSchema("PUBLIC")
+            .setQueryEntities(Collections.singletonList(new QueryEntity()
+                .setKeyType(Integer.class.getName())
+                .setValueType(TYPE_NAME)
+                .setFields(new LinkedHashMap<>(F.asMap("id", Integer.class.getName(), "name", String.class.getName())))
+                .setIndexes(Collections.singletonList(new QueryIndex("id")))));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Function<Integer, Object> valueBuilder() {
+        return valBuilder;
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testBasicClusterSnapshotRestore() throws Exception {
+        valBuilder = new IndexedValueBuilder();
+
+        IgniteEx client = startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true);
+
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+        assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testBasicClusterSnapshotRestoreWithMetadata() throws Exception {
+        IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+        // Remove metadata.
+        int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
+
+        ignite.context().cacheObjects().removeType(typeId);
+
+        forceCheckpoint();
+
+        ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+
+        for (Ignite grid : G.allGrids())
+            assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception {
+        int nodesCnt = 4;
+
+        startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+
+        grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        startGrid(nodesCnt - 2);
+
+        IgniteEx ignite = startGrid(nodesCnt - 1);
+
+        resetBaselineTopology();
+
+        awaitPartitionMapExchange();
+
+        ignite.cache(DEFAULT_CACHE_NAME).destroy();
+
+        awaitPartitionMapExchange();
+
+        // Remove metadata.
+        int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
+
+        ignite.context().cacheObjects().removeType(typeId);
+
+        forceCheckpoint();
+
+        // Restore from an empty node.
+        ignite.snapshot().restoreSnapshot(
+            SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+        awaitPartitionMapExchange();
+
+        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void assertCacheKeys(IgniteCache<Object, Object> cache, int keysCnt) {
+        super.assertCacheKeys(cache, keysCnt);
+
+        String tblName = new BinaryBasicNameMapper(true).typeName(TYPE_NAME);
+
+        for (Ignite grid : G.allGrids()) {
+            GridKernalContext ctx = ((IgniteEx)grid).context();
+
+            String nodeId = ctx.localNodeId().toString();
+
+            assertTrue("nodeId=" + nodeId, grid.cache(cache.getName()).indexReadyFuture().isDone());
+
+            // Make sure no index rebuild happened.
+            assertEquals("nodeId=" + nodeId,
+                0, ctx.cache().cache(cache.getName()).context().cache().metrics0().getIndexRebuildKeysProcessed());
+
+            GridQueryProcessor qry = ((IgniteEx)grid).context().query();
+
+            // Make sure  SQL works fine.
+            assertEquals("nodeId=" + nodeId, (long)keysCnt, qry.querySqlFields(new SqlFieldsQuery(
+                "SELECT count(*) FROM " + tblName), true).getAll().get(0).get(0));
+
+            // Make sure the index is in use.
+            String explainPlan = (String)qry.querySqlFields(new SqlFieldsQuery(
+                "explain SELECT * FROM " + tblName + " WHERE id < 10"), true).getAll().get(0).get(0);
+
+            assertTrue("nodeId=" + nodeId + "\n" + explainPlan, explainPlan.contains("ID_ASC_IDX"));
+        }
+    }
+
+    /** */
+    private static class IndexedValueBuilder implements Function<Integer, Object> {
+        /** {@inheritDoc} */
+        @Override public Object apply(Integer key) {
+            return new IndexedObject(key, "Person number #" + key);
+        }
+    }
+
+    /** */
+    private static class IndexedObject {
+        /** Id. */
+        @QuerySqlField(index = true)
+        private final int id;
+
+        /** Name. */
+        @QuerySqlField
+        private final String name;
+
+        /**
+         * @param id Id.
+         */
+        public IndexedObject(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            IndexedObject obj = (IndexedObject)o;
+
+            return id == obj.id && Objects.equals(name, obj.name);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(name, id);
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 593f5df..69cbb71 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -29,6 +29,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.MultipleParallelCacheDeleteDeadlockTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreWithIndexingTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
 import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
@@ -66,7 +67,8 @@
     CacheGroupReencryptionTest.class,
     IgnitePdsIndexingDefragmentationTest.class,
     StopRebuildIndexTest.class,
-    ForceRebuildIndexTest.class
+    ForceRebuildIndexTest.class,
+    IgniteClusterSnapshotRestoreWithIndexingTest.class
 })
 public class IgnitePdsWithIndexingTestSuite {
 }