blob: 7c28fa54c5752b0a4be576d298212bd487733f31 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
* Distributed process to restore cache group from the snapshot.
public class SnapshotRestoreProcess {
/** Temporary cache directory prefix. */
public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";
/** Snapshot restore metrics prefix. */
public static final String SNAPSHOT_RESTORE_METRICS = "snapshot-restore";
/** Reject operation message. */
private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. ";
/** Snapshot restore operation finish message. */
private static final String OP_FINISHED_MSG = "Cache groups have been successfully restored from the snapshot";
/** Snapshot restore operation failed message. */
private static final String OP_FAILED_MSG = "Failed to restore snapshot cache groups";
/** Kernal context. */
private final GridKernalContext ctx;
/** Cache group restore prepare phase. */
private final DistributedProcess<SnapshotOperationRequest, SnapshotRestoreOperationResponse> prepareRestoreProc;
/** Cache group restore preload partitions phase. */
private final DistributedProcess<UUID, Boolean> preloadProc;
/** Cache group restore cache start phase. */
private final DistributedProcess<UUID, Boolean> cacheStartProc;
/** Cache group restore rollback phase. */
private final DistributedProcess<UUID, Boolean> rollbackRestoreProc;
/** Logger. */
private final IgniteLogger log;
/** Future to be completed when the cache restore process is complete (this future will be returned to the user). */
private volatile ClusterSnapshotFuture fut;
/** Current snapshot restore operation context (will be {@code null} when the operation is not running). */
private volatile SnapshotRestoreContext opCtx;
/** Last snapshot restore operation context (saves the metrics of the last operation). */
private volatile SnapshotRestoreContext lastOpCtx = new SnapshotRestoreContext();
* @param ctx Kernal context.
public SnapshotRestoreProcess(GridKernalContext ctx) {
this.ctx = ctx;
log = ctx.log(getClass());
prepareRestoreProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
preloadProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, this::preload, this::finishPreload);
cacheStartProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
rollbackRestoreProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK, this::rollback, this::finishRollback);
* Cleanup temporary directories if any exists.
* @throws IgniteCheckedException If it was not possible to delete some temporary directory.
protected void cleanup() throws IgniteCheckedException {
FilePageStoreManager pageStore = (FilePageStoreManager)ctx.cache().context().pageStore();
File dbDir = pageStore.workDir();
for (File dir : dbDir.listFiles(dir -> dir.isDirectory() && dir.getName().startsWith(TMP_CACHE_DIR_PREFIX))) {
if (!U.delete(dir)) {
throw new IgniteCheckedException("Unable to remove temporary directory, " +
"try deleting it manually [dir=" + dir + ']');
* Register local metrics.
protected void registerMetrics() {
assert !ctx.clientNode();
MetricRegistry mreg = ctx.metric().registry(SNAPSHOT_RESTORE_METRICS);
mreg.register("startTime", () -> lastOpCtx.startTime,
"The system time of the start of the cluster snapshot restore operation on this node.");
mreg.register("endTime", () -> lastOpCtx.endTime,
"The system time when the restore operation of a cluster snapshot on this node ended.");
mreg.register("snapshotName", () -> lastOpCtx.snpName, String.class,
"The snapshot name of the last running cluster snapshot restore operation on this node.");
mreg.register("requestId", () -> Optional.ofNullable(lastOpCtx.reqId).map(UUID::toString).orElse(""),
String.class, "The request ID of the last running cluster snapshot restore operation on this node.");
mreg.register("error", () -> Optional.ofNullable(lastOpCtx.err.get()).map(Throwable::toString).orElse(""),
String.class, "Error message of the last running cluster snapshot restore operation on this node.");
mreg.register("totalPartitions", () -> lastOpCtx.totalParts,
"The total number of partitions to be restored on this node.");
mreg.register("processedPartitions", () -> lastOpCtx.processedParts.get(),
"The number of processed partitions on this node.");
* Start cache group restore operation.
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param cacheGrpNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
* @return Future that will be completed when the restore operation is complete and the cache groups are started.
public IgniteFuture<Void> start(String snpName, @Nullable String snpPath, @Nullable Collection<String> cacheGrpNames) {
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
ClusterSnapshotFuture fut0;
try {
if (ctx.clientNode())
throw new IgniteException(OP_REJECT_MSG + "Client and daemon nodes can not perform this operation.");
DiscoveryDataClusterState clusterState = ctx.state().clusterState();
if (clusterState.state() != ClusterState.ACTIVE || clusterState.transition())
throw new IgniteException(OP_REJECT_MSG + "The cluster should be active.");
if (!clusterState.hasBaselineTopology())
throw new IgniteException(OP_REJECT_MSG + "The baseline topology is not configured for cluster.");
if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), SNAPSHOT_RESTORE_CACHE_GROUP))
throw new IgniteException(OP_REJECT_MSG + "Not all nodes in the cluster support restore operation.");
if (snpMgr.isSnapshotCreating())
throw new IgniteException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
synchronized (this) {
if (restoringSnapshotName() != null)
throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed.");
fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName);
fut0 = fut;
catch (IgniteException e) {
OP_FAILED_MSG + ": " + e.getMessage(),
return new IgniteFinishedFutureImpl<>(e);
fut0.listen(f -> {
if (f.error() != null) {
OP_FAILED_MSG + ": " + f.error().getMessage() + " [reqId=" + fut0.rqId + "].",
else {
OP_FINISHED_MSG + " [reqId=" + fut0.rqId + "].",
String msg = "Cluster-wide snapshot restore operation started [reqId=" + fut0.rqId + ", snpName=" + snpName +
(cacheGrpNames == null ? "" : ", caches=" + cacheGrpNames) + ']';
if (log.isInfoEnabled());
snpMgr.recordSnapshotEvent(snpName, msg, EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED);
snpMgr.checkSnapshot(snpName, snpPath, cacheGrpNames, true).listen(f -> {
if (f.error() != null) {
finishProcess(fut0.rqId, f.error());
if (!F.isEmpty(f.result().exceptions())) {
finishProcess(fut0.rqId, F.first(f.result().exceptions().values()));
if (fut0.interruptEx != null) {
finishProcess(fut0.rqId, fut0.interruptEx);
Set<UUID> dataNodes = new HashSet<>();
Set<String> snpBltNodes = null;
Map<ClusterNode, List<SnapshotMetadata>> metas = f.result().metas();
Map<Integer, String> reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() :, v -> v));
for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
for (SnapshotMetadata meta : entry.getValue()) {
assert meta != null : entry.getKey().id();
if (snpBltNodes == null)
snpBltNodes = new HashSet<>(meta.baselineNodes());
if (snpBltNodes == null) {
finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " +
"has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
if (!reqGrpIds.isEmpty()) {
finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " +
"found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());
SnapshotOperationRequest req = new SnapshotOperationRequest(
fut0.rqId, F.first(dataNodes), snpName, snpPath, cacheGrpNames, new HashSet<>(bltNodes));
prepareRestoreProc.start(req.requestId(), req);
return new IgniteFutureImpl<>(fut0);
* Get the name of the snapshot currently being restored
* @return Name of the snapshot currently being restored or {@code null} if the restore process is not running.
public @Nullable String restoringSnapshotName() {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null)
return opCtx0.snpName;
ClusterSnapshotFuture fut0 = fut;
return fut0 != null ? : null;
* @param ccfg Cache configuration.
* @return {@code True} if the cache or group with the specified name is currently being restored.
public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
return isRestoring(ccfg, opCtx);
* Check if the cache or group with the specified name is currently being restored from the snapshot.
* @param opCtx Restoring context.
* @param ccfg Cache configuration.
* @return {@code True} if the cache or group with the specified name is currently being restored.
private boolean isRestoring(CacheConfiguration<?, ?> ccfg, @Nullable SnapshotRestoreContext opCtx) {
assert ccfg != null;
if (opCtx == null)
return false;
Map<Integer, StoredCacheData> cacheCfgs = opCtx.cfgs;
String cacheName = ccfg.getName();
String grpName = ccfg.getGroupName();
int cacheId = CU.cacheId(cacheName);
if (cacheCfgs.containsKey(cacheId))
return true;
for (File grpDir : opCtx.dirs) {
String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
if (grpName != null) {
if (cacheName.equals(locGrpName))
return true;
if (CU.cacheId(locGrpName) == CU.cacheId(grpName))
return true;
else if (CU.cacheId(locGrpName) == cacheId)
return true;
return false;
* @param reqId Request ID.
* @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
* starting the cache(s), the whole procedure is rolled back.
public Set<UUID> cacheStartRequiredAliveNodes(IgniteUuid reqId) {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
return Collections.emptySet();
return new HashSet<>(opCtx0.nodes());
* Finish local cache group restore process.
* @param reqId Request ID.
* @param err Error, if any.
private void finishProcess(UUID reqId, @Nullable Throwable err) {
if (err != null)
log.error(OP_FAILED_MSG + " [reqId=" + reqId + "].", err);
else if (log.isInfoEnabled()) + " [reqId=" + reqId + "].");
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null && reqId.equals(opCtx0.reqId)) {
opCtx = null;
opCtx0.endTime = U.currentTimeMillis();
synchronized (this) {
ClusterSnapshotFuture fut0 = fut;
if (fut0 != null && reqId.equals(fut0.rqId)) {
fut = null;
ctx.pools().getSystemExecutorService().submit(() -> fut0.onDone(null, err));
* Node left callback.
* @param leftNodeId Left node ID.
public void onNodeLeft(UUID leftNodeId) {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null && opCtx0.nodes().contains(leftNodeId)) {
opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
"Required node has left the cluster [nodeId=" + leftNodeId + ']'));
* Cancel the currently running local restore procedure.
* @param reason Interruption reason.
* @param snpName Snapshot name.
* @return Future that will be finished when process the process is complete. The result of this future will be
* {@code false} if the restore process with the specified snapshot name is not running at all.
public IgniteFuture<Boolean> cancel(IgniteCheckedException reason, String snpName) {
SnapshotRestoreContext opCtx0;
ClusterSnapshotFuture fut0 = null;
synchronized (this) {
opCtx0 = opCtx;
if (fut != null && {
fut0 = fut;
fut0.interruptEx = reason;
boolean ctxStop = opCtx0 != null && opCtx0.snpName.equals(snpName);
if (ctxStop)
interrupt(opCtx0, reason);
return fut0 == null ? new IgniteFinishedFutureImpl<>(ctxStop) :
new IgniteFutureImpl<>(fut0.chain(f -> true));
* Interrupt the currently running local restore procedure.
* @param reason Interruption reason.
public void interrupt(IgniteCheckedException reason) {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null)
interrupt(opCtx0, reason);
* Interrupt the currently running local restore procedure.
* @param opCtx Snapshot restore operation context.
* @param reason Interruption reason.
private void interrupt(SnapshotRestoreContext opCtx, IgniteCheckedException reason) {
opCtx.err.compareAndSet(null, reason);
IgniteFuture<?> stopFut;
synchronized (this) {
stopFut = opCtx.stopFut;
if (stopFut != null)
* Ensures that a cache with the specified name does not exist locally.
* @param name Cache name.
private void ensureCacheAbsent(String name) {
int id = CU.cacheId(name);
if (ctx.cache().cacheGroupDescriptors().containsKey(id) || ctx.cache().cacheDescriptor(id) != null) {
throw new IgniteIllegalStateException("Cache \"" + name +
"\" should be destroyed manually before perform restore operation.");
* @param req Request to prepare cache group restore from the snapshot.
* @return Result future.
private IgniteInternalFuture<SnapshotRestoreOperationResponse> prepare(SnapshotOperationRequest req) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
try {
DiscoveryDataClusterState state = ctx.state().clusterState();
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
if (state.state() != ClusterState.ACTIVE || state.transition())
throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
if (snpMgr.isSnapshotCreating())
throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
if (ctx.encryption().isMasterKeyChangeInProgress()) {
return new GridFinishedFuture<>(new IgniteCheckedException(OP_REJECT_MSG + "Master key changing " +
"process is not finished yet."));
if (ctx.encryption().reencryptionInProgress()) {
return new GridFinishedFuture<>(new IgniteCheckedException(OP_REJECT_MSG + "Caches re-encryption " +
"process is not finished yet."));
for (UUID nodeId : req.nodes()) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null || !CU.baselineNode(node, state) || !ctx.discovery().alive(node)) {
throw new IgniteCheckedException(
OP_REJECT_MSG + "Required node has left the cluster [nodeId-" + nodeId + ']');
if (log.isInfoEnabled()) {"Starting local snapshot prepare restore operation" +
" [reqId=" + req.requestId() +
", snapshot=" + req.snapshotName() +
", caches=" + req.groups() + ']');
List<SnapshotMetadata> locMetas = snpMgr.readSnapshotMetadatas(req.snapshotName(), req.snapshotPath());
SnapshotRestoreContext opCtx0 = prepareContext(req, locMetas);
synchronized (this) {
lastOpCtx = opCtx = opCtx0;
ClusterSnapshotFuture fut0 = fut;
if (fut0 != null)
// Ensure that shared cache groups has no conflicts.
for (StoredCacheData cfg : opCtx0.cfgs.values()) {
if (!F.isEmpty(cfg.config().getGroupName()))
if (ctx.isStopping())
throw new NodeStoppingException("The node is stopping: " + ctx.localNodeId());
return new GridFinishedFuture<>(new SnapshotRestoreOperationResponse(opCtx0.cfgs.values(), locMetas));
catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
log.error("Unable to restore cache group(s) from the snapshot " +
"[reqId=" + req.requestId() + ", snapshot=" + req.snapshotName() + ']', e);
return new GridFinishedFuture<>(e);
* @param cacheDir Cache directory.
* @return Temporary directory.
static File formatTmpDirName(File cacheDir) {
return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
* @param tmpCacheDir Temporary cache directory.
* @return Cache or group id.
static int groupIdFromTmpDir(File tmpCacheDir) {
assert tmpCacheDir.getName().startsWith(TMP_CACHE_DIR_PREFIX) : tmpCacheDir;
String cacheGrpName = tmpCacheDir.getName().substring(TMP_CACHE_DIR_PREFIX.length());
return CU.cacheId(cacheGroupName(new File(tmpCacheDir.getParentFile(), cacheGrpName)));
* @param req Request to prepare cache group restore from the snapshot.
* @param metas Local snapshot metadatas.
* @return Snapshot restore operation context.
* @throws IgniteCheckedException If failed.
private SnapshotRestoreContext prepareContext(
SnapshotOperationRequest req,
Collection<SnapshotMetadata> metas
) throws IgniteCheckedException {
if (opCtx != null) {
throw new IgniteCheckedException(OP_REJECT_MSG +
"The previous snapshot restore operation was not completed.");
GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
// Collection of baseline nodes that must survive and additional discovery data required for the affinity calculation.
DiscoCache discoCache = ctx.discovery().discoCache();
if (!F.transform(discoCache.aliveBaselineNodes(), F.node2id()).containsAll(req.nodes()))
throw new IgniteCheckedException("Restore context cannot be inited since the required baseline nodes missed: " + discoCache);
DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
if (F.isEmpty(metas))
return new SnapshotRestoreContext(req, discoCache0, Collections.emptyMap());
if (F.first(metas).pageSize() != cctx.database().pageSize()) {
throw new IgniteCheckedException("Incompatible memory page size " +
"[snapshotPageSize=" + F.first(metas).pageSize() +
", local=" + cctx.database().pageSize() +
", snapshot=" + req.snapshotName() +
", nodeId=" + cctx.localNodeId() + ']');
Map<String, StoredCacheData> cfgsByName = new HashMap<>();
FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
GridLocalConfigManager locCfgMgr = cctx.cache().configManager();
// Collect the cache configurations and prepare a temporary directory for copying files.
// Metastorage can be restored only manually by directly copying files.
for (SnapshotMetadata meta : metas) {
for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), req.snapshotPath(), meta.folderName(),
name -> !METASTORAGE_CACHE_NAME.equals(name))) {
String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
if (cacheDir.exists()) {
if (!cacheDir.isDirectory()) {
throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
"name already exists [group=" + grpName + ", file=" + cacheDir + ']');
if (cacheDir.list().length > 0) {
throw new IgniteCheckedException("Unable to restore cache group - directory is not empty. " +
"Cache group should be destroyed manually before perform restore operation " +
"[group=" + grpName + ", dir=" + cacheDir + ']');
if (!cacheDir.delete()) {
throw new IgniteCheckedException("Unable to remove empty cache directory " +
"[group=" + grpName + ", dir=" + cacheDir + ']');
File tmpCacheDir = formatTmpDirName(cacheDir);
if (tmpCacheDir.exists()) {
throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
"[group=" + grpName + ", dir=" + tmpCacheDir + ']');
locCfgMgr.readCacheConfigurations(snpCacheDir, cfgsByName);
Map<Integer, StoredCacheData> cfgsById =
cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
return new SnapshotRestoreContext(req, discoCache0, cfgsById);
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
private void finishPrepare(UUID reqId, Map<UUID, SnapshotRestoreOperationResponse> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
SnapshotRestoreContext opCtx0 = opCtx;
Exception failure = F.first(errs.values());
assert opCtx0 != null || failure != null : "Context has not been created on the node " + ctx.localNodeId();
if (opCtx0 == null || !reqId.equals(opCtx0.reqId)) {
finishProcess(reqId, failure);
if (failure == null)
failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
if (e.getValue().ccfgs != null) {
for (StoredCacheData cacheData : e.getValue().ccfgs) {
globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
opCtx0.metasPerNode.put(e.getKey(), new ArrayList<>(e.getValue().metas));
opCtx0.cfgs = globalCfgs;
if (U.isLocalNodeCoordinator(ctx.discovery()))
preloadProc.start(reqId, reqId);
* @param ccfg Cache configuration.
* @param cache Discovery cache.
* @return Map of affinity per each cache group.
private static GridAffinityAssignmentCache calculateAffinity(
GridKernalContext ctx,
CacheConfiguration<?, ?> ccfg,
DiscoCache cache
) {
GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
affCache.calculate(cache.version(), null, cache);
return affCache;
* @param metas List of snapshot metadata to check.
* @param grpId Group id.
* @param parts Set of partitions to search for.
* @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
List<SnapshotMetadata> metas,
int grpId,
Set<Integer> parts
) {
assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
// Try to copy everything right from the single snapshot part.
for (SnapshotMetadata meta : metas) {
Set<Integer> grpParts = meta.partitions().get(grpId);
Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
if (grpWoIndex.equals(parts))
return meta;
return null;
* @param reqId Request id.
* @return Future which will be completed when the preload ends.
private IgniteInternalFuture<Boolean> preload(UUID reqId) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
if (opCtx0 == null)
return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
if (opCtx0.dirs.isEmpty())
return new GridFinishedFuture<>();
try {
if (ctx.isStopping())
throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
Set<SnapshotMetadata> allMetas =
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
synchronized (this) {
opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
if (log.isInfoEnabled()) {"Starting snapshot preload operation to restore cache groups " +
"[reqId=" + reqId +
", snapshot=" + opCtx0.snpName +
", caches=" + F.transform(opCtx0.dirs, FilePageStoreManager::cacheGroupName) + ']');
File snpDir = snpMgr.snapshotLocalDir(opCtx0.snpName, opCtx0.snpPath);
CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
() -> {
try {
SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
File binDir = binaryWorkDir(snpDir.getAbsolutePath(), meta.folderName());
ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
catch (Throwable t) {
log.error("Unable to perform metadata update operation for the cache groups restore process", t);
}, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
Map<String, GridAffinityAssignmentCache> affCache = new HashMap<>();
for (StoredCacheData data : opCtx0.cfgs.values()) {
grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
Map<Integer, Set<PartitionRestoreFuture>> allParts = new HashMap<>();
Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
ClusterNode locNode = ctx.cache().context().localNode();
List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(;
// First preload everything from the local node.
for (File dir : opCtx0.dirs) {
String cacheOrGrpName = cacheGroupName(dir);
int grpId = CU.cacheId(cacheOrGrpName);
File tmpCacheDir = formatTmpDirName(dir);
Set<PartitionRestoreFuture> leftParts;
// Partitions contained in the snapshot.
Set<Integer> availParts = new HashSet<>();
for (SnapshotMetadata meta : allMetas) {
Set<Integer> parts = meta.partitions().get(grpId);
if (parts != null)
List<List<ClusterNode>> assignment = affCache.get(cacheOrGrpName).idealAssignment().assignment();
Set<PartitionRestoreFuture> partFuts = availParts
.filter(p -> p != INDEX_PARTITION && assignment.get(p).contains(locNode))
.map(p -> new PartitionRestoreFuture(p, opCtx0.processedParts))
allParts.put(grpId, partFuts);
rmtLoadParts.put(grpId, leftParts = new HashSet<>(partFuts));
if (leftParts.isEmpty())
SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
grpId, -> p.partId).collect(Collectors.toSet()));
for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
if (leftParts.isEmpty())
File snpCacheDir = new File(snpDir,
Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
leftParts.removeIf(partFut -> {
boolean doCopy = ofNullable(meta.partitions().get(grpId))
if (doCopy) {
return doCopy;
if (meta == full) {
assert leftParts.isEmpty() : leftParts;
if (log.isInfoEnabled()) {"The snapshot was taken on the same cluster topology. The index will be copied to " +
"restoring cache group if necessary [reqId=" + reqId + ", snapshot=" + opCtx0.snpName +
", dir=" + dir.getName() + ']');
File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
if (idxFile.exists()) {
PartitionRestoreFuture idxFut;
allParts.computeIfAbsent(grpId, g -> new HashSet<>())
.add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION, opCtx0.processedParts));
// Load other partitions from remote nodes.
List<PartitionRestoreFuture> rmtAwaitParts = rmtLoadParts.values().stream()
// This is necessary for sending only one partitions request per each cluster node.
Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(
.filter(e -> !e.getKey().equals(ctx.localNodeId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
(grpId, partId) -> rmtLoadParts.get(grpId) != null &&
rmtLoadParts.get(grpId).remove(new PartitionRestoreFuture(partId, opCtx0.processedParts)));
try {
if (log.isInfoEnabled() && !snpAff.isEmpty()) {"Trying to request partitions from remote nodes " +
"[reqId=" + reqId +
", snapshot=" + opCtx0.snpName +
", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
e -> partitionsMapToCompactString(e.getValue()))) + ']');
for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {
(snpFile, t) -> {
if (opCtx0.stopChecker.getAsBoolean())
throw new IgniteInterruptedException("Snapshot remote operation request cancelled.");
if (t != null) {
completeListExceptionally(rmtAwaitParts, t);
int grpId = groupIdFromTmpDir(snpFile.getParentFile());
int partId = partId(snpFile.getName());
PartitionRestoreFuture partFut = F.find(allParts.get(grpId), null,
fut -> fut.partId == partId);
assert partFut != null : snpFile.getAbsolutePath();
catch (IgniteCheckedException e) {
completeListExceptionally(rmtAwaitParts, e);
List<PartitionRestoreFuture> allPartFuts = allParts.values().stream().flatMap(Collection::stream)
int size = allPartFuts.size();
opCtx0.totalParts = size;
CompletableFuture.allOf(allPartFuts.toArray(new CompletableFuture[size]))
.runAfterBothAsync(metaFut, () -> {
try {
if (opCtx0.stopChecker.getAsBoolean())
throw new IgniteInterruptedException("The operation has been stopped on temporary directory switch.");
for (File src : opCtx0.dirs)
Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
catch (IOException e) {
throw new IgniteException(e);
}, snpMgr.snapshotExecutorService())
.whenComplete((r, t) -> opCtx0.errHnd.accept(t))
.whenComplete((res, t) -> {
Throwable t0 = ofNullable(opCtx0.err.get()).orElse(t);
if (t0 == null)
else {
log.error("Unable to restore cache group(s) from a snapshot " +
"[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t0);
catch (Exception ex) {
return new GridFinishedFuture<>(ex);
return retFut;
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
SnapshotRestoreContext opCtx0 = opCtx;
Exception failure = errs.values().stream().findFirst().
orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
if (failure != null) {
if (U.isLocalNodeCoordinator(ctx.discovery()))
rollbackRestoreProc.start(reqId, reqId);
if (U.isLocalNodeCoordinator(ctx.discovery()))
cacheStartProc.start(reqId, reqId);
* @param reqId Request ID.
* @return Result future.
private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
Throwable err = opCtx0.err.get();
if (err != null)
return new GridFinishedFuture<>(err);
if (!U.isLocalNodeCoordinator(ctx.discovery()))
return new GridFinishedFuture<>();
Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
if (log.isInfoEnabled()) {"Starting restored caches " +
"[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
// We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
// the cluster during the cache startup, the whole procedure will be rolled back.
return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false,
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
SnapshotRestoreContext opCtx0 = opCtx;
Exception failure = errs.values().stream().findFirst().
orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
if (failure == null) {
finishProcess(reqId, null);
opCtx0.err.compareAndSet(null, failure);
if (U.isLocalNodeCoordinator(ctx.discovery()))
rollbackRestoreProc.start(reqId, reqId);
* @param metas Map of snapshot metadata distribution across the cluster.
* @return Map of cache partitions per each node.
private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
Map<UUID, List<SnapshotMetadata>> metas,
BiPredicate<Integer, Integer> filter
) {
Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
List<UUID> nodes = new ArrayList<>(metas.keySet());
Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
UUID nodeId = e.getKey();
for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {
Map<Integer, Set<Integer>> parts = ofNullable(meta.partitions()).orElse(Collections.emptyMap());
for (Map.Entry<Integer, Set<Integer>> metaParts : parts.entrySet()) {
for (Integer partId : metaParts.getValue()) {
if (filter.test(metaParts.getKey(), partId)) {
nodeToSnp.computeIfAbsent(nodeId, n -> new HashMap<>())
.computeIfAbsent(metaParts.getKey(), k -> new HashSet<>())
return nodeToSnp;
* @param reqNodes Set of required topology nodes.
* @param respNodes Set of responding topology nodes.
* @return Error, if no response was received from the required topology node.
private Exception checkNodeLeft(Collection<UUID> reqNodes, Set<UUID> respNodes) {
if (!respNodes.containsAll(reqNodes)) {
Set<UUID> leftNodes = new HashSet<>(reqNodes);
return new ClusterTopologyCheckedException(OP_REJECT_MSG +
"Required node has left the cluster [nodeId=" + leftNodes + ']');
return null;
* @param reqId Request ID.
* @return Result future.
private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 == null || F.isEmpty(opCtx0.dirs))
return new GridFinishedFuture<>();
GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
synchronized (this) {
opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
try {
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
if (log.isInfoEnabled()) {"Removing restored cache directories [reqId=" + reqId +
", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
IgniteCheckedException ex = null;
for (File cacheDir : opCtx0.dirs) {
File tmpCacheDir = formatTmpDirName(cacheDir);
if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
"[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
if (cacheDir.exists() && !U.delete(cacheDir)) {
log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
"[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
if (ex != null)
catch (RejectedExecutionException e) {
log.error("Unable to perform rollback routine, task has been rejected " +
"[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
return retFut;
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
if (!errs.isEmpty()) {
log.warning("Some nodes were unable to complete the rollback routine completely, check the local log " +
"files for more information [nodeIds=" + errs.keySet() + ']');
SnapshotRestoreContext opCtx0 = opCtx;
if (!res.keySet().containsAll(opCtx0.nodes())) {
Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes());
log.warning("Some of the nodes left the cluster and were unable to complete the rollback" +
" operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
finishProcess(reqId, opCtx0.err.get());
* @param mgr Ignite snapshot manager.
* @param opCtx Snapshot operation context.
* @param srcDir Snapshot directory to copy from.
* @param targetDir Destination directory to copy to.
private static void copyLocalAsync(
IgniteSnapshotManager mgr,
SnapshotRestoreContext opCtx,
File srcDir,
File targetDir,
PartitionRestoreFuture partFut
) {
File snpFile = new File(srcDir, FilePageStoreManager.getPartitionFileName(partFut.partId));
Path partFile = Paths.get(targetDir.getAbsolutePath(), FilePageStoreManager.getPartitionFileName(partFut.partId));
CompletableFuture.supplyAsync(() -> {
if (opCtx.stopChecker.getAsBoolean())
throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
if (Thread.interrupted())
throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
if (!snpFile.exists()) {
throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length());
return partFile;
}, mgr.snapshotExecutorService())
.whenComplete((r, t) -> opCtx.errHnd.accept(t))
.whenComplete((r, t) -> {
if (t == null)
* @param col Collection of sets to complete.
* @param ex Exception to set.
private static void completeListExceptionally(List<PartitionRestoreFuture> col, Throwable ex) {
for (PartitionRestoreFuture f : col)
* @param map Map of partitions and cache groups.
* @return String representation.
private static String partitionsMapToCompactString(Map<Integer, Set<Integer>> map) {
return map.entrySet()
.collect(Collectors.toMap(Map.Entry::getKey, e -> S.compact(e.getValue())))
* Cache group restore from snapshot operation context.
private static class SnapshotRestoreContext {
/** Request ID. */
private final UUID reqId;
/** Snapshot name. */
private final String snpName;
/** Snapshot directory path. */
private final String snpPath;
/** Baseline discovery cache for node IDs that must be alive to complete the operation.*/
private final DiscoCache discoCache;
/** Operational node id. */
private final UUID opNodeId;
* Set of restored cache groups path on local node. Collected when all cache configurations received
* from the <tt>prepare</tt> distributed process.
private final Set<File> dirs = new HashSet<>();
/** The exception that led to the interruption of the process. */
private final AtomicReference<Throwable> err = new AtomicReference<>();
/** Distribution of snapshot metadata files across the cluster. */
private final Map<UUID, List<SnapshotMetadata>> metasPerNode = new HashMap<>();
/** Context error handler. */
private final Consumer<Throwable> errHnd = (ex) -> err.compareAndSet(null, ex);
/** Stop condition checker. */
private final BooleanSupplier stopChecker = () -> err.get() != null;
/** Cache ID to configuration mapping. */
private volatile Map<Integer, StoredCacheData> cfgs = Collections.emptyMap();
/** Graceful shutdown future. */
private volatile IgniteFuture<?> stopFut;
/** Operation start time. */
private final long startTime;
/** Number of processed (copied) partitions. */
private final AtomicInteger processedParts = new AtomicInteger(0);
/** Total number of partitions to be restored. */
private volatile int totalParts = -1;
/** Operation end time. */
private volatile long endTime;
/** Creates an empty context. */
protected SnapshotRestoreContext() {
reqId = null;
snpName = "";
startTime = 0;
opNodeId = null;
discoCache = null;
snpPath = null;
* @param req Request to prepare cache group restore from the snapshot.
* @param discoCache Baseline discovery cache for node IDs that must be alive to complete the operation.
* @param cfgs Cache ID to configuration mapping.
protected SnapshotRestoreContext(SnapshotOperationRequest req, DiscoCache discoCache, Map<Integer, StoredCacheData> cfgs) {
reqId = req.requestId();
snpName = req.snapshotName();
snpPath = req.snapshotPath();
opNodeId = req.operationalNodeId();
startTime = U.currentTimeMillis();
this.discoCache = discoCache;
this.cfgs = cfgs;
* @return Required baseline nodeIds that must be alive to complete restore operation.
public Collection<UUID> nodes() {
return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
/** Snapshot operation prepare response. */
private static class SnapshotRestoreOperationResponse implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** Cache configurations on local node. */
private final List<StoredCacheData> ccfgs;
/** Snapshot metadata files on local node. */
private final List<SnapshotMetadata> metas;
* @param ccfgs Cache configurations on local node.
* @param metas Snapshot metadata files on local node.
public SnapshotRestoreOperationResponse(
Collection<StoredCacheData> ccfgs,
Collection<SnapshotMetadata> metas
) {
this.ccfgs = new ArrayList<>(ccfgs);
this.metas = new ArrayList<>(metas);
/** Future will be completed when partition processing ends. */
private static class PartitionRestoreFuture extends CompletableFuture<Path> {
/** Partition id. */
private final int partId;
/** Counter of the total number of processed partitions. */
private final AtomicInteger cntr;
* @param partId Partition id.
* @param cntr Counter of the total number of processed partitions.
private PartitionRestoreFuture(int partId, AtomicInteger cntr) {
this.partId = partId;
this.cntr = cntr;
/** {@inheritDoc} */
@Override public boolean complete(Path path) {
return super.complete(path);
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
PartitionRestoreFuture future = (PartitionRestoreFuture)o;
return partId == future.partId;
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(partId);
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PartitionRestoreFuture.class, this);