blob: 261f5331d9cc1eb696cb52560bfc73ecfef4e8a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
/**
* Distributed process to restore cache group from the snapshot.
*/
public class SnapshotRestoreProcess {
/** Temporary cache directory prefix. */
public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";
/** Reject operation message. */
private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. ";
/** Kernal context. */
private final GridKernalContext ctx;
/** Cache group restore prepare phase. */
private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
/** Cache group restore cache start phase. */
private final DistributedProcess<UUID, Boolean> cacheStartProc;
/** Cache group restore rollback phase. */
private final DistributedProcess<UUID, Boolean> rollbackRestoreProc;
/** Logger. */
private final IgniteLogger log;
/** Future to be completed when the cache restore process is complete (this future will be returned to the user). */
private volatile ClusterSnapshotFuture fut;
/** Snapshot restore operation context. */
private volatile SnapshotRestoreContext opCtx;
/**
* @param ctx Kernal context.
*/
public SnapshotRestoreProcess(GridKernalContext ctx) {
this.ctx = ctx;
log = ctx.log(getClass());
prepareRestoreProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
cacheStartProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
rollbackRestoreProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK, this::rollback, this::finishRollback);
}
/**
* Cleanup temporary directories if any exists.
*
* @throws IgniteCheckedException If it was not possible to delete some temporary directory.
*/
protected void cleanup() throws IgniteCheckedException {
FilePageStoreManager pageStore = (FilePageStoreManager)ctx.cache().context().pageStore();
File dbDir = pageStore.workDir();
for (File dir : dbDir.listFiles(dir -> dir.isDirectory() && dir.getName().startsWith(TMP_CACHE_DIR_PREFIX))) {
if (!U.delete(dir)) {
throw new IgniteCheckedException("Unable to remove temporary directory, " +
"try deleting it manually [dir=" + dir + ']');
}
}
}
/**
* Start cache group restore operation.
*
* @param snpName Snapshot name.
* @param cacheGrpNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
* @return Future that will be completed when the restore operation is complete and the cache groups are started.
*/
public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cacheGrpNames) {
ClusterSnapshotFuture fut0;
try {
if (ctx.clientNode())
throw new IgniteException(OP_REJECT_MSG + "Client and daemon nodes can not perform this operation.");
DiscoveryDataClusterState clusterState = ctx.state().clusterState();
if (clusterState.state() != ClusterState.ACTIVE || clusterState.transition())
throw new IgniteException(OP_REJECT_MSG + "The cluster should be active.");
if (!clusterState.hasBaselineTopology())
throw new IgniteException(OP_REJECT_MSG + "The baseline topology is not configured for cluster.");
if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), SNAPSHOT_RESTORE_CACHE_GROUP))
throw new IgniteException(OP_REJECT_MSG + "Not all nodes in the cluster support restore operation.");
if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
throw new IgniteException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
synchronized (this) {
if (restoringSnapshotName() != null)
throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed.");
fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName);
fut0 = fut;
}
}
catch (IgniteException e) {
return new IgniteFinishedFutureImpl<>(e);
}
ctx.cache().context().snapshotMgr().checkSnapshot(snpName, cacheGrpNames).listen(f -> {
if (f.error() != null) {
finishProcess(fut0.rqId, f.error());
return;
}
if (!F.isEmpty(f.result().exceptions())) {
finishProcess(fut0.rqId, F.first(f.result().exceptions().values()));
return;
}
if (fut0.interruptEx != null) {
finishProcess(fut0.rqId, fut0.interruptEx);
return;
}
Set<UUID> dataNodes = new HashSet<>();
Set<String> snpBltNodes = null;
Map<ClusterNode, List<SnapshotMetadata>> metas = f.result().metas();
Map<Integer, String> reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() :
cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v));
for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
SnapshotMetadata meta = F.first(entry.getValue());
assert meta != null : entry.getKey().id();
if (!entry.getKey().consistentId().toString().equals(meta.consistentId()))
continue;
if (snpBltNodes == null)
snpBltNodes = new HashSet<>(meta.baselineNodes());
dataNodes.add(entry.getKey().id());
reqGrpIds.keySet().removeAll(meta.partitions().keySet());
}
if (snpBltNodes == null) {
finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " +
"has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
return;
}
if (!reqGrpIds.isEmpty()) {
finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " +
"found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
return;
}
Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
snpBltNodes.removeAll(bltNodes);
if (!snpBltNodes.isEmpty()) {
finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
"restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
return;
}
IdleVerifyResultV2 res = f.result().idleVerifyResult();
if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) {
StringBuilder sb = new StringBuilder();
res.print(sb::append, true);
finishProcess(fut0.rqId, new IgniteException(sb.toString()));
return;
}
SnapshotOperationRequest req = new SnapshotOperationRequest(
fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
prepareRestoreProc.start(req.requestId(), req);
});
return new IgniteFutureImpl<>(fut0);
}
/**
* Get the name of the snapshot currently being restored
*
* @return Name of the snapshot currently being restored or {@code null} if the restore process is not running.
*/
public @Nullable String restoringSnapshotName() {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null)
return opCtx0.snpName;
ClusterSnapshotFuture fut0 = fut;
return fut0 != null ? fut0.name : null;
}
/**
* Check if the cache or group with the specified name is currently being restored from the snapshot.
*
* @param cacheName Cache name.
* @param grpName Cache group name.
* @return {@code True} if the cache or group with the specified name is currently being restored.
*/
public boolean isRestoring(String cacheName, @Nullable String grpName) {
assert cacheName != null;
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 == null)
return false;
Map<Integer, StoredCacheData> cacheCfgs = opCtx0.cfgs;
int cacheId = CU.cacheId(cacheName);
if (cacheCfgs.containsKey(cacheId))
return true;
for (File grpDir : opCtx0.dirs) {
String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
if (grpName != null) {
if (cacheName.equals(locGrpName))
return true;
if (CU.cacheId(locGrpName) == CU.cacheId(grpName))
return true;
}
else if (CU.cacheId(locGrpName) == cacheId)
return true;
}
return false;
}
/**
* @param reqId Request ID.
* @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
* starting the cache(s), the whole procedure is rolled back.
*/
public Set<UUID> cacheStartRequiredAliveNodes(IgniteUuid reqId) {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
return Collections.emptySet();
return Collections.unmodifiableSet(opCtx0.nodes);
}
/**
* Finish local cache group restore process.
*
* @param reqId Request ID.
*/
private void finishProcess(UUID reqId) {
finishProcess(reqId, null);
}
/**
* Finish local cache group restore process.
*
* @param reqId Request ID.
* @param err Error, if any.
*/
private void finishProcess(UUID reqId, @Nullable Throwable err) {
if (err != null)
log.error("Failed to restore snapshot cache group [reqId=" + reqId + ']', err);
else if (log.isInfoEnabled())
log.info("Successfully restored cache group(s) from the snapshot [reqId=" + reqId + ']');
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null && reqId.equals(opCtx0.reqId))
opCtx = null;
synchronized (this) {
ClusterSnapshotFuture fut0 = fut;
if (fut0 != null && reqId.equals(fut0.rqId)) {
fut = null;
ctx.getSystemExecutorService().submit(() -> fut0.onDone(null, err));
}
}
}
/**
* Node left callback.
*
* @param leftNodeId Left node ID.
*/
public void onNodeLeft(UUID leftNodeId) {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) {
opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
"Required node has left the cluster [nodeId=" + leftNodeId + ']'));
}
}
/**
* Cancel the currently running local restore procedure.
*
* @param reason Interruption reason.
* @param snpName Snapshot name.
* @return Future that will be finished when process the process is complete. The result of this future will be
* {@code false} if the restore process with the specified snapshot name is not running at all.
*/
public IgniteFuture<Boolean> cancel(IgniteCheckedException reason, String snpName) {
SnapshotRestoreContext opCtx0;
ClusterSnapshotFuture fut0 = null;
synchronized (this) {
opCtx0 = opCtx;
if (fut != null && fut.name.equals(snpName)) {
fut0 = fut;
fut0.interruptEx = reason;
}
}
boolean ctxStop = opCtx0 != null && opCtx0.snpName.equals(snpName);
if (ctxStop)
interrupt(opCtx0, reason);
return fut0 == null ? new IgniteFinishedFutureImpl<>(ctxStop) :
new IgniteFutureImpl<>(fut0.chain(f -> true));
}
/**
* Interrupt the currently running local restore procedure.
*
* @param reason Interruption reason.
*/
public void interrupt(IgniteCheckedException reason) {
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 != null)
interrupt(opCtx0, reason);
}
/**
* Interrupt the currently running local restore procedure.
*
* @param opCtx Snapshot restore operation context.
* @param reason Interruption reason.
*/
private void interrupt(SnapshotRestoreContext opCtx, IgniteCheckedException reason) {
opCtx.err.compareAndSet(null, reason);
IgniteFuture<?> stopFut;
synchronized (this) {
stopFut = opCtx.stopFut;
}
if (stopFut != null)
stopFut.get();
}
/**
* Ensures that a cache with the specified name does not exist locally.
*
* @param name Cache name.
*/
private void ensureCacheAbsent(String name) {
int id = CU.cacheId(name);
if (ctx.cache().cacheGroupDescriptors().containsKey(id) || ctx.cache().cacheDescriptor(id) != null) {
throw new IgniteIllegalStateException("Cache \"" + name +
"\" should be destroyed manually before perform restore operation.");
}
}
/**
* @param req Request to prepare cache group restore from the snapshot.
* @return Result future.
*/
private IgniteInternalFuture<ArrayList<StoredCacheData>> prepare(SnapshotOperationRequest req) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
try {
DiscoveryDataClusterState state = ctx.state().clusterState();
if (state.state() != ClusterState.ACTIVE || state.transition())
throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
for (UUID nodeId : req.nodes()) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null || !CU.baselineNode(node, state) || !ctx.discovery().alive(node)) {
throw new IgniteCheckedException(
OP_REJECT_MSG + "Required node has left the cluster [nodeId-" + nodeId + ']');
}
}
SnapshotRestoreContext opCtx0 = prepareContext(req);
synchronized (this) {
opCtx = opCtx0;
ClusterSnapshotFuture fut0 = fut;
if (fut0 != null && fut0.interruptEx != null)
opCtx0.err.compareAndSet(null, fut0.interruptEx);
}
if (opCtx0.dirs.isEmpty())
return new GridFinishedFuture<>();
// Ensure that shared cache groups has no conflicts.
for (StoredCacheData cfg : opCtx0.cfgs.values()) {
ensureCacheAbsent(cfg.config().getName());
if (!F.isEmpty(cfg.config().getGroupName()))
ensureCacheAbsent(cfg.config().getGroupName());
}
if (log.isInfoEnabled()) {
log.info("Starting local snapshot restore operation" +
" [reqId=" + req.requestId() +
", snapshot=" + req.snapshotName() +
", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']');
}
Consumer<Throwable> errHnd = (ex) -> opCtx.err.compareAndSet(null, ex);
BooleanSupplier stopChecker = () -> opCtx.err.get() != null;
GridFutureAdapter<ArrayList<StoredCacheData>> retFut = new GridFutureAdapter<>();
if (ctx.isStopping())
throw new NodeStoppingException("Node is stopping.");
opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd)
.thenAccept(res -> {
try {
Throwable err = opCtx.err.get();
if (err != null)
throw err;
for (File src : opCtx0.dirs)
Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
catch (Throwable t) {
log.error("Unable to restore cache group(s) from the snapshot " +
"[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t);
retFut.onDone(t);
return;
}
retFut.onDone(new ArrayList<>(opCtx.cfgs.values()));
});
return retFut;
}
catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
log.error("Unable to restore cache group(s) from the snapshot " +
"[reqId=" + req.requestId() + ", snapshot=" + req.snapshotName() + ']', e);
return new GridFinishedFuture<>(e);
}
}
/**
* @param cacheDir Cache directory.
* @return Temporary directory.
*/
private File formatTmpDirName(File cacheDir) {
return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
}
/**
* Copy partition files and update binary metadata.
*
* @param snpName Snapshot name.
* @param dirs Cache directories to restore from the snapshot.
* @param updateMeta Update binary metadata flag.
* @param stopChecker Process interrupt checker.
* @param errHnd Error handler.
* @throws IgniteCheckedException If failed.
*/
private CompletableFuture<Void> restoreAsync(
String snpName,
Collection<File> dirs,
boolean updateMeta,
BooleanSupplier stopChecker,
Consumer<Throwable> errHnd
) throws IgniteCheckedException {
IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
List<CompletableFuture<Void>> futs = new ArrayList<>();
if (updateMeta) {
File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName);
futs.add(CompletableFuture.runAsync(() -> {
try {
ctx.cacheObjects().updateMetadata(binDir, stopChecker);
}
catch (Throwable t) {
errHnd.accept(t);
}
}, snapshotMgr.snapshotExecutorService()));
}
for (File cacheDir : dirs) {
File tmpCacheDir = formatTmpDirName(cacheDir);
File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName),
Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir;
for (File snpFile : snpCacheDir.listFiles()) {
futs.add(CompletableFuture.runAsync(() -> {
if (stopChecker.getAsBoolean())
return;
try {
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
File target = new File(tmpCacheDir, snpFile.getName());
if (log.isDebugEnabled()) {
log.debug("Copying file from the snapshot " +
"[snapshot=" + snpName +
", src=" + snpFile +
", target=" + target + "]");
}
IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length());
}
catch (Throwable t) {
errHnd.accept(t);
}
}, ctx.cache().context().snapshotMgr().snapshotExecutorService()));
}
}
int futsSize = futs.size();
return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]));
}
/**
* @param req Request to prepare cache group restore from the snapshot.
* @return Snapshot restore operation context.
* @throws IgniteCheckedException If failed.
*/
private SnapshotRestoreContext prepareContext(SnapshotOperationRequest req) throws IgniteCheckedException {
if (opCtx != null) {
throw new IgniteCheckedException(OP_REJECT_MSG +
"The previous snapshot restore operation was not completed.");
}
GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName()));
if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString()))
return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap());
if (meta.pageSize() != cctx.database().pageSize()) {
throw new IgniteCheckedException("Incompatible memory page size " +
"[snapshotPageSize=" + meta.pageSize() +
", local=" + cctx.database().pageSize() +
", snapshot=" + req.snapshotName() +
", nodeId=" + cctx.localNodeId() + ']');
}
List<File> cacheDirs = new ArrayList<>();
Map<String, StoredCacheData> cfgsByName = new HashMap<>();
FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
// Collect the cache configurations and prepare a temporary directory for copying files.
// Metastorage can be restored only manually by directly copying files.
for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
name -> !METASTORAGE_CACHE_NAME.equals(name)))
{
String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
continue;
File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
if (cacheDir.exists()) {
if (!cacheDir.isDirectory()) {
throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
"name already exists [group=" + grpName + ", file=" + cacheDir + ']');
}
if (cacheDir.list().length > 0) {
throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
"[group=" + grpName + ", dir=" + cacheDir + ']');
}
if (!cacheDir.delete()) {
throw new IgniteCheckedException("Unable to remove empty cache directory " +
"[group=" + grpName + ", dir=" + cacheDir + ']');
}
}
File tmpCacheDir = formatTmpDirName(cacheDir);
if (tmpCacheDir.exists()) {
throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
"[group=" + grpName + ", dir=" + tmpCacheDir + ']');
}
if (!tmpCacheDir.mkdir()) {
throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " +
"[group=" + grpName + ", dir=" + tmpCacheDir + ']');
}
cacheDirs.add(cacheDir);
pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
}
Map<Integer, StoredCacheData> cfgsById =
cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
return new SnapshotRestoreContext(req, cacheDirs, cfgsById);
}
/**
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
*/
private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
return;
SnapshotRestoreContext opCtx0 = opCtx;
Exception failure = F.first(errs.values());
assert opCtx0 != null || failure != null : "Context has not been created on the node " + ctx.localNodeId();
if (opCtx0 == null || !reqId.equals(opCtx0.reqId)) {
finishProcess(reqId, failure);
return;
}
if (failure == null)
failure = checkNodeLeft(opCtx0.nodes, res.keySet());
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
opCtx0.err.compareAndSet(null, failure);
if (U.isLocalNodeCoordinator(ctx.discovery()))
rollbackRestoreProc.start(reqId, reqId);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
for (List<StoredCacheData> storedCfgs : res.values()) {
if (storedCfgs == null)
continue;
for (StoredCacheData cacheData : storedCfgs)
globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
}
opCtx0.cfgs = globalCfgs;
if (U.isLocalNodeCoordinator(ctx.discovery()))
cacheStartProc.start(reqId, reqId);
}
/**
* @param reqId Request ID.
* @return Result future.
*/
private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
Throwable err = opCtx0.err.get();
if (err != null)
return new GridFinishedFuture<>(err);
if (!U.isLocalNodeCoordinator(ctx.discovery()))
return new GridFinishedFuture<>();
Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
if (log.isInfoEnabled()) {
log.info("Starting restored caches " +
"[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
}
// We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
// the cluster during the cache startup, the whole procedure will be rolled back.
return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
}
/**
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
*/
private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
return;
SnapshotRestoreContext opCtx0 = opCtx;
Exception failure = errs.values().stream().findFirst().
orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
if (failure == null) {
finishProcess(reqId);
return;
}
opCtx0.err.compareAndSet(null, failure);
if (U.isLocalNodeCoordinator(ctx.discovery()))
rollbackRestoreProc.start(reqId, reqId);
}
/**
* @param reqNodes Set of required topology nodes.
* @param respNodes Set of responding topology nodes.
* @return Error, if no response was received from the required topology node.
*/
private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
if (!respNodes.containsAll(reqNodes)) {
Set<UUID> leftNodes = new HashSet<>(reqNodes);
leftNodes.removeAll(respNodes);
return new ClusterTopologyCheckedException(OP_REJECT_MSG +
"Required node has left the cluster [nodeId=" + leftNodes + ']');
}
return null;
}
/**
* @param reqId Request ID.
* @return Result future.
*/
private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
if (opCtx0 == null || F.isEmpty(opCtx0.dirs))
return new GridFinishedFuture<>();
GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
synchronized (this) {
opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
try {
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
if (log.isInfoEnabled()) {
log.info("Removing restored cache directories [reqId=" + reqId +
", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
}
IgniteCheckedException ex = null;
for (File cacheDir : opCtx0.dirs) {
File tmpCacheDir = formatTmpDirName(cacheDir);
if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
"[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
}
if (cacheDir.exists() && !U.delete(cacheDir)) {
log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
"[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
}
}
if (ex != null)
retFut.onDone(ex);
else
retFut.onDone(true);
});
}
catch (RejectedExecutionException e) {
log.error("Unable to perform rollback routine, task has been rejected " +
"[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
retFut.onDone(e);
}
}
return retFut;
}
/**
* @param reqId Request ID.
* @param res Results.
* @param errs Errors.
*/
private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
return;
if (!errs.isEmpty()) {
log.warning("Some nodes were unable to complete the rollback routine completely, check the local log " +
"files for more information [nodeIds=" + errs.keySet() + ']');
}
SnapshotRestoreContext opCtx0 = opCtx;
if (!res.keySet().containsAll(opCtx0.nodes)) {
Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes);
leftNodes.removeAll(res.keySet());
log.warning("Some of the nodes left the cluster and were unable to complete the rollback" +
" operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
}
finishProcess(reqId, opCtx0.err.get());
}
/**
* Cache group restore from snapshot operation context.
*/
private static class SnapshotRestoreContext {
/** Request ID. */
private final UUID reqId;
/** Snapshot name. */
private final String snpName;
/** Baseline node IDs that must be alive to complete the operation. */
private final Set<UUID> nodes;
/** List of restored cache group directories. */
private final Collection<File> dirs;
/** The exception that led to the interruption of the process. */
private final AtomicReference<Throwable> err = new AtomicReference<>();
/** Cache ID to configuration mapping. */
private volatile Map<Integer, StoredCacheData> cfgs;
/** Graceful shutdown future. */
private volatile IgniteFuture<?> stopFut;
/**
* @param req Request to prepare cache group restore from the snapshot.
* @param dirs List of cache group names to restore from the snapshot.
* @param cfgs Cache ID to configuration mapping.
*/
protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
Map<Integer, StoredCacheData> cfgs) {
reqId = req.requestId();
snpName = req.snapshotName();
nodes = new HashSet<>(req.nodes());
this.dirs = dirs;
this.cfgs = cfgs;
}
}
}