IGNITE-13805 Add cache groups snapshot restore on the same topology (#8648)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
index 3623c1f..0d46cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -17,7 +17,9 @@
package org.apache.ignite;
+import java.util.Collection;
import org.apache.ignite.lang.IgniteFuture;
+import org.jetbrains.annotations.Nullable;
/**
* This interface provides functionality for creating cluster-wide cache data snapshots.
@@ -48,4 +50,16 @@
* @return Future which will be completed when cancel operation finished.
*/
public IgniteFuture<Void> cancelSnapshot(String name);
+
+ /**
+ * Restore cache group(s) from the snapshot.
+ * <p>
+ * <b>NOTE:</b> Cache groups to be restored from the snapshot must not present in the cluster, if they present,
+ * they must be destroyed by the user (eg with {@link IgniteCache#destroy()}) before starting this operation.
+ *
+ * @param name Snapshot name.
+ * @param cacheGroupNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
+ * @return Future which will be completed when restore operation finished.
+ */
+ public IgniteFuture<Void> restoreSnapshot(String name, @Nullable Collection<String> cacheGroupNames);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index fa487a0..4f8fdba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -139,7 +139,10 @@
CACHE_GROUP_KEY_CHANGE(47),
/** Collecting performance statistics. */
- PERFORMANCE_STATISTICS(48);
+ PERFORMANCE_STATISTICS(48),
+
+ /** Restore cache group from the snapshot. */
+ SNAPSHOT_RESTORE_CACHE_GROUP(49);
/**
* Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index ffe51c8..236051a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -48,6 +48,7 @@
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
@@ -55,6 +56,7 @@
import org.apache.ignite.internal.managers.systemview.walker.CacheViewWalker;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -582,6 +584,28 @@
DiscoveryDataClusterState state = ctx.state().clusterState();
if (state.active() && !state.transition()) {
+ Set<IgniteUuid> restartIds = new HashSet<>(F.viewReadOnly(
+ batch.requests(), DynamicCacheChangeRequest::restartId, req -> req.start() && req.restartId() != null));
+
+ assert restartIds.size() <= 1 : batch.requests();
+
+ Collection<UUID> nodes = ctx.cache().context().snapshotMgr().cacheStartRequiredAliveNodes(F.first(restartIds));
+
+ for (UUID nodeId : nodes) {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node != null && CU.baselineNode(node, state) && ctx.discovery().alive(node))
+ continue;
+
+ ClusterTopologyCheckedException err =
+ new ClusterTopologyCheckedException("Required node has left the cluster [nodeId=" + nodeId + ']');
+
+ for (DynamicCacheChangeRequest req : batch.requests())
+ ctx.cache().completeCacheStartFuture(req, false, err);
+
+ return false;
+ }
+
ExchangeActions exchangeActions = new ExchangeActions();
CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions,
@@ -593,6 +617,9 @@
assert !exchangeActions.empty() : exchangeActions;
batch.exchangeActions(exchangeActions);
+
+ if (!nodes.isEmpty())
+ exchangeActions.cacheStartRequiredAliveNodes(nodes);
}
return res.needExchange;
@@ -1007,6 +1034,16 @@
}
}
+ if (err == null && req.restartId() == null) {
+ IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
+
+ if (snapshotMgr.isRestoring(cacheName, ccfg.getGroupName())) {
+ err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " +
+ "currently being restored from a snapshot [cache=" + cacheName +
+ (ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']');
+ }
+ }
+
if (err != null) {
if (persistedCfgs)
res.errs.add(err);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index cbe7df4..8736a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -43,6 +44,12 @@
/** */
private Map<String, CacheActionData> cachesToStart;
+ /**
+ * Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when starting
+ * the cache(s), the whole procedure is rolled back.
+ */
+ private Collection<UUID> cacheStartRequiredAliveNodes;
+
/** */
private Map<String, CacheActionData> cachesToStop;
@@ -320,6 +327,23 @@
}
/**
+ * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
+ * starting the cache(s), the whole procedure is rolled back.
+ */
+ public Collection<UUID> cacheStartRequiredAliveNodes() {
+ return cacheStartRequiredAliveNodes == null ? Collections.emptyList() : cacheStartRequiredAliveNodes;
+ }
+
+ /**
+ * @param cacheStartRequiredAliveNodes Server nodes on which a successful start of the cache(s) is required, if any
+ * of these nodes fails when starting the cache(s), the whole procedure is
+ * rolled back.
+ */
+ public void cacheStartRequiredAliveNodes(Collection<UUID> cacheStartRequiredAliveNodes) {
+ this.cacheStartRequiredAliveNodes = new ArrayList<>(cacheStartRequiredAliveNodes);
+ }
+
+ /**
* @param grpDesc Group descriptor.
* @param destroy Destroy flag.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e8ee3f1..142c4db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -4267,6 +4267,9 @@
if (res == null)
res = validateRestartingCaches(node);
+ if (res == null)
+ res = validateRestoringCaches(node);
+
return res;
}
@@ -4294,6 +4297,20 @@
}
/**
+ * @param node Joining node to validate.
+ * @return Node validation result if there was an issue with the joining node, {@code null} otherwise.
+ */
+ private IgniteNodeValidationResult validateRestoringCaches(ClusterNode node) {
+ if (ctx.cache().context().snapshotMgr().isRestoring()) {
+ String msg = "Joining node during caches restore is not allowed [joiningNodeId=" + node.id() + ']';
+
+ return new IgniteNodeValidationResult(node.id(), msg);
+ }
+
+ return null;
+ }
+
+ /**
* @return Keep static cache configuration flag. If {@code true}, static cache configuration will override
* configuration persisted on disk.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index f3495ac..cb513f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -293,6 +293,8 @@
stateAwareMgrs.add(snpMgr);
+ stateAwareMgrs.add(snapshotMgr);
+
for (PluginProvider prv : kernalCtx.plugins().allProviders())
if (prv instanceof IgniteChangeGlobalStateSupport)
stateAwareMgrs.add(((IgniteChangeGlobalStateSupport)prv));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 4d613d9..15619d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -32,6 +32,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.BooleanSupplier;
import javax.cache.CacheException;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
@@ -55,6 +56,7 @@
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
@@ -993,6 +995,42 @@
}
/** {@inheritDoc} */
+ @Override public void updateMetadata(File metadataDir, BooleanSupplier stopChecker) throws IgniteCheckedException {
+ if (!metadataDir.exists())
+ return;
+
+ try {
+ ConcurrentMap<Integer, BinaryMetadataHolder> metaCache = new ConcurrentHashMap<>();
+
+ new BinaryMetadataFileStore(metaCache, ctx, log, metadataDir)
+ .restoreMetadata();
+
+ Collection<BinaryMetadata> metadata = F.viewReadOnly(metaCache.values(), BinaryMetadataHolder::metadata);
+
+ // Check the compatibility of the binary metadata.
+ for (BinaryMetadata newMeta : metadata) {
+ BinaryMetadata oldMeta = binaryMetadata(newMeta.typeId());
+
+ if (oldMeta != null)
+ BinaryUtils.mergeMetadata(oldMeta, newMeta, null);
+ }
+
+ // Update cluster metadata.
+ for (BinaryMetadata newMeta : metadata) {
+ if (stopChecker.getAsBoolean())
+ return;
+
+ if (Thread.interrupted())
+ throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+
+ addMeta(newMeta.typeId(), newMeta.wrap(binaryContext()), false);
+ }
+ } catch (BinaryObjectException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException {
A.notNullOrEmpty(typeName, "enum type name");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b3642c1..6826d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -299,7 +299,7 @@
private boolean forceAffReassignment;
/** Exception that was thrown during init phase on local node. */
- private Exception exchangeLocE;
+ private volatile Exception exchangeLocE;
/** Exchange exceptions from all participating nodes. */
private final Map<UUID, Exception> exchangeGlobalExceptions = new ConcurrentHashMap<>();
@@ -5124,6 +5124,12 @@
if (crd0 == null)
finishState = new FinishState(null, initialVersion(), null);
+
+ if (dynamicCacheStartExchange() && exchangeLocE == null &&
+ exchActions.cacheStartRequiredAliveNodes().contains(node.id())) {
+ exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE = new ClusterTopologyCheckedException(
+ "Required node has left the cluster [nodeId=" + node.id() + ']'));
+ }
}
if (crd0 == null) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index a7c682d..24954a4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -974,32 +974,40 @@
Arrays.sort(files);
for (File file : files) {
- if (file.isDirectory()) {
- if (file.getName().startsWith(CACHE_DIR_PREFIX)) {
- File conf = new File(file, CACHE_DATA_FILENAME);
-
- if (conf.exists() && conf.length() > 0) {
- StoredCacheData cacheData = readCacheData(conf);
-
- String cacheName = cacheData.config().getName();
-
- if (!ccfgs.containsKey(cacheName))
- ccfgs.put(cacheName, cacheData);
- else {
- U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "
- + file.getName());
- }
- }
- }
- else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
- readCacheGroupCaches(file, ccfgs);
- }
+ if (file.isDirectory())
+ readCacheConfigurations(file, ccfgs);
}
return ccfgs;
}
/**
+ * @param dir Cache (group) directory.
+ * @param ccfgs Cache configurations.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void readCacheConfigurations(File dir, Map<String, StoredCacheData> ccfgs) throws IgniteCheckedException {
+ if (dir.getName().startsWith(CACHE_DIR_PREFIX)) {
+ File conf = new File(dir, CACHE_DATA_FILENAME);
+
+ if (conf.exists() && conf.length() > 0) {
+ StoredCacheData cacheData = readCacheData(conf);
+
+ String cacheName = cacheData.config().getName();
+
+ if (!ccfgs.containsKey(cacheName))
+ ccfgs.put(cacheName, cacheData);
+ else {
+ U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "
+ + dir.getName());
+ }
+ }
+ }
+ else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+ readCacheGroupCaches(dir, ccfgs);
+ }
+
+ /**
* @param dir Directory to check.
* @return Files that match cache or cache group pattern.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index d23b584..90304bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -51,7 +51,6 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiFunction;
@@ -105,6 +104,7 @@
import org.apache.ignite.internal.processors.cache.tree.DataRow;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -121,7 +121,6 @@
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -129,6 +128,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -188,7 +188,7 @@
* </ul>
*/
public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
- implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+ implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener, IgniteChangeGlobalStateSupport {
/** File with delta pages suffix. */
public static final String DELTA_SUFFIX = ".delta";
@@ -258,6 +258,9 @@
/** Marshaller. */
private final Marshaller marsh;
+ /** Distributed process to restore cache group from the snapshot. */
+ private final SnapshotRestoreProcess restoreCacheGrpProc;
+
/** Resolved persistent data storage settings. */
private volatile PdsFolderSettings pdsSettings;
@@ -315,6 +318,8 @@
this::processLocalSnapshotEndStageResult);
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
+
+ restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
}
/**
@@ -404,12 +409,14 @@
for (SnapshotFutureTask sctx : locSnpTasks.values()) {
if (sctx.sourceNodeId().equals(leftNodeId) ||
(snpReq != null &&
- snpReq.snpName.equals(sctx.snapshotName()) &&
- snpReq.bltNodes.contains(leftNodeId))) {
+ snpReq.snapshotName().equals(sctx.snapshotName()) &&
+ snpReq.nodes().contains(leftNodeId))) {
sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
"One of baseline nodes left the cluster: " + leftNodeId));
}
}
+
+ restoreCacheGrpProc.onNodeLeft(leftNodeId);
}
}
finally {
@@ -423,6 +430,8 @@
busyLock.block();
try {
+ restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
+
// Try stop all snapshot processing if not yet.
for (SnapshotFutureTask sctx : locSnpTasks.values())
sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -450,6 +459,16 @@
}
}
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) {
+ restoreCacheGrpProc.interrupt(new IgniteCheckedException("The cluster has been deactivated."));
+ }
+
/**
* @param snpDir Snapshot dir.
* @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
@@ -547,7 +566,7 @@
"Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
}
- Set<UUID> leftNodes = new HashSet<>(req.bltNodes);
+ Set<UUID> leftNodes = new HashSet<>(req.nodes());
leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
F.node2id()));
@@ -556,7 +575,9 @@
"prior to snapshot operation start: " + leftNodes));
}
- Set<Integer> leftGrps = new HashSet<>(req.grpIds);
+ List<Integer> grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId));
+
+ Set<Integer> leftGrps = new HashSet<>(grpIds);
leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
if (!leftGrps.isEmpty()) {
@@ -568,7 +589,7 @@
// Prepare collection of pairs group and appropriate cache partition to be snapshot.
// Cache group context may be 'null' on some nodes e.g. a node filter is set.
- for (Integer grpId : req.grpIds) {
+ for (Integer grpId : grpIds) {
if (cctx.cache().cacheGroup(grpId) == null)
continue;
@@ -580,10 +601,10 @@
if (parts.isEmpty())
task0 = new GridFinishedFuture<>(Collections.emptySet());
else {
- task0 = registerSnapshotTask(req.snpName,
- req.srcNodeId,
+ task0 = registerSnapshotTask(req.snapshotName(),
+ req.operationalNodeId(),
parts,
- locSndrFactory.apply(req.snpName));
+ locSndrFactory.apply(req.snapshotName()));
clusterSnpReq = req;
}
@@ -593,11 +614,11 @@
throw F.wrap(fut.error());
try {
- Set<String> blts = req.bltNodes.stream()
+ Set<String> blts = req.nodes().stream()
.map(n -> cctx.discovery().node(n).consistentId().toString())
.collect(Collectors.toSet());
- File smf = new File(snapshotLocalDir(req.snpName), snapshotMetaFileName(cctx.localNode().consistentId().toString()));
+ File smf = new File(snapshotLocalDir(req.snapshotName()), snapshotMetaFileName(cctx.localNode().consistentId().toString()));
if (smf.exists())
throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
@@ -606,12 +627,12 @@
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
U.marshal(marsh,
- new SnapshotMetadata(req.rqId,
- req.snpName,
+ new SnapshotMetadata(req.requestId(),
+ req.snapshotName(),
cctx.localNode().consistentId().toString(),
pdsSettings.folderName(),
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
- req.grpIds,
+ grpIds,
blts,
fut.result()),
out);
@@ -640,7 +661,7 @@
boolean cancelled = err.values().stream().anyMatch(e -> e instanceof IgniteFutureCancelledCheckedException);
- if (snpReq == null || !snpReq.rqId.equals(id)) {
+ if (snpReq == null || !snpReq.requestId().equals(id)) {
synchronized (snpOpMux) {
if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
if (cancelled) {
@@ -659,18 +680,18 @@
}
if (isLocalNodeCoordinator(cctx.discovery())) {
- Set<UUID> missed = new HashSet<>(snpReq.bltNodes);
+ Set<UUID> missed = new HashSet<>(snpReq.nodes());
missed.removeAll(res.keySet());
missed.removeAll(err.keySet());
if (cancelled) {
- snpReq.err = new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
- "has been cancelled by external process [err=" + err + ", missed=" + missed + ']');
+ snpReq.error(new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
+ "has been cancelled by external process [err=" + err + ", missed=" + missed + ']'));
}
else if (!F.isEmpty(err) || !missed.isEmpty()) {
- snpReq.err = new IgniteCheckedException("Execution of local snapshot tasks fails or them haven't been executed " +
+ snpReq.error(new IgniteCheckedException("Execution of local snapshot tasks fails or them haven't been executed " +
"due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
- "[err=" + err + ", missed=" + missed + ']');
+ "[err=" + err + ", missed=" + missed + ']'));
}
endSnpProc.start(UUID.randomUUID(), snpReq);
@@ -686,8 +707,8 @@
return new GridFinishedFuture<>(new SnapshotOperationResponse());
try {
- if (req.err != null)
- deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName());
+ if (req.error() != null)
+ deleteSnapshot(snapshotLocalDir(req.snapshotName()), pdsSettings.folderName());
removeLastMetaStorageKey();
}
@@ -709,26 +730,26 @@
if (snpReq == null)
return;
- Set<UUID> endFail = new HashSet<>(snpReq.bltNodes);
+ Set<UUID> endFail = new HashSet<>(snpReq.nodes());
endFail.removeAll(res.keySet());
clusterSnpReq = null;
synchronized (snpOpMux) {
if (clusterSnpFut != null) {
- if (endFail.isEmpty() && snpReq.err == null) {
+ if (endFail.isEmpty() && snpReq.error() == null) {
clusterSnpFut.onDone();
if (log.isInfoEnabled())
log.info(SNAPSHOT_FINISHED_MSG + snpReq);
}
- else if (snpReq.err == null) {
+ else if (snpReq.error() == null) {
clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
"Local snapshot tasks may not finished completely or finalizing results fails " +
"[fail=" + endFail + ", err=" + err + ']'));
}
else
- clusterSnpFut.onDone(snpReq.err);
+ clusterSnpFut.onDone(snpReq.error());
clusterSnpFut = null;
}
@@ -748,6 +769,38 @@
}
/**
+ * Check if snapshot restore process is currently running.
+ *
+ * @return {@code True} if the snapshot restore operation is in progress.
+ */
+ public boolean isRestoring() {
+ return restoreCacheGrpProc.isRestoring();
+ }
+
+ /**
+ * @param restoreId Restore process ID.
+ * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
+ * starting the cache(s), the whole procedure is rolled back.
+ */
+ public Set<UUID> cacheStartRequiredAliveNodes(@Nullable IgniteUuid restoreId) {
+ if (restoreId == null)
+ return Collections.emptySet();
+
+ return restoreCacheGrpProc.cacheStartRequiredAliveNodes(restoreId);
+ }
+
+ /**
+ * Check if the cache or group with the specified name is currently being restored from the snapshot.
+ *
+ * @param cacheName Cache name.
+ * @param grpName Cache group name.
+ * @return {@code True} if the cache or group with the specified name is being restored.
+ */
+ public boolean isRestoring(String cacheName, @Nullable String grpName) {
+ return restoreCacheGrpProc.isRestoring(cacheName, grpName);
+ }
+
+ /**
* @return List of all known snapshots on the local node.
*/
public List<String> localSnapshotNames() {
@@ -829,34 +882,20 @@
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
- GridKernalContext kctx0 = cctx.kernalContext();
GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
- kctx0.security().authorize(ADMIN_SNAPSHOT);
-
- Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
- (node) -> CU.baselineNode(node, kctx0.state().clusterState()));
-
- kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
- kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);
-
- kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
- .listen(f0 -> {
+ collectSnapshotMetadata(name).listen(f0 -> {
if (f0.error() == null) {
Map<ClusterNode, List<SnapshotMetadata>> metas = f0.result();
- kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
- kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet()));
-
- kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas)
- .listen(f1 -> {
- if (f1.error() == null)
- res.onDone(f1.result());
- else if (f1.error() instanceof IgniteSnapshotVerifyException)
- res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
- else
- res.onDone(f1.error());
- });
+ runSnapshotVerification(metas).listen(f1 -> {
+ if (f1.error() == null)
+ res.onDone(f1.result());
+ else if (f1.error() instanceof IgniteSnapshotVerifyException)
+ res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+ else
+ res.onDone(f1.error());
+ });
}
else {
if (f0.error() instanceof IgniteSnapshotVerifyException)
@@ -870,6 +909,37 @@
}
/**
+ * @param name Snapshot name.
+ * @return Future with snapshot metadata obtained from nodes.
+ */
+ IgniteInternalFuture<Map<ClusterNode, List<SnapshotMetadata>>> collectSnapshotMetadata(String name) {
+ GridKernalContext kctx0 = cctx.kernalContext();
+
+ kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+ Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+ (node) -> CU.baselineNode(node, kctx0.state().clusterState()));
+
+ kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+ kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);
+
+ return kctx0.task().execute(SnapshotMetadataCollectorTask.class, name);
+ }
+
+ /**
+ * @param metas Nodes snapshot metadata.
+ * @return Future with the verification results.
+ */
+ IgniteInternalFuture<IdleVerifyResultV2> runSnapshotVerification(Map<ClusterNode, List<SnapshotMetadata>> metas) {
+ GridKernalContext kctx0 = cctx.kernalContext();
+
+ kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+ kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet()));
+
+ return kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas);
+ }
+
+ /**
* @param snpName Snapshot name.
* @param folderName Directory name for cache group.
* @return The list of cache or cache group names in given snapshot on local node.
@@ -1012,16 +1082,19 @@
if (localSnapshotNames().contains(name))
throw new IgniteException("Create snapshot request has been rejected. Snapshot with given name already exists on local node.");
+ if (isRestoring())
+ throw new IgniteException("Snapshot operation has been rejected. Cache group restore operation is currently in progress.");
+
snpFut0 = new ClusterSnapshotFuture(UUID.randomUUID(), name);
clusterSnpFut = snpFut0;
lastSeenSnpFut = snpFut0;
}
- List<Integer> grps = cctx.cache().persistentGroups().stream()
+ List<String> grps = cctx.cache().persistentGroups().stream()
.filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == CacheType.USER)
.filter(g -> !g.config().isEncryptionEnabled())
- .map(CacheGroupDescriptor::groupId)
+ .map(CacheGroupDescriptor::cacheOrGroupName)
.collect(Collectors.toList());
List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
@@ -1039,7 +1112,8 @@
grps,
new HashSet<>(F.viewReadOnly(srvNodes,
F.node2id(),
- (node) -> CU.baselineNode(node, clusterState)))));
+ (node) -> CU.baselineNode(node, clusterState)))
+ ));
String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']';
@@ -1062,6 +1136,15 @@
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> restoreSnapshot(String name, @Nullable Collection<String> grpNames) {
+ A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+ A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+ A.ensure(grpNames == null || !grpNames.isEmpty(), "List of cache group names cannot be empty.");
+
+ return restoreCacheGrpProc.start(name, grpNames);
+ }
+
+ /** {@inheritDoc} */
@Override public void onReadyForReadWrite(ReadWriteMetastorage metaStorage) throws IgniteCheckedException {
synchronized (snpOpMux) {
this.metaStorage = metaStorage;
@@ -1075,6 +1158,8 @@
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metaStorage) throws IgniteCheckedException {
+ restoreCacheGrpProc.cleanup();
+
// Snapshot which has not been completed due to the local node crashed must be deleted.
String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
@@ -1111,13 +1196,13 @@
SnapshotOperationRequest snpReq = clusterSnpReq;
- SnapshotFutureTask task = locSnpTasks.get(snpReq.snpName);
+ SnapshotFutureTask task = locSnpTasks.get(snpReq.snapshotName());
if (task == null)
return;
if (task.start()) {
- cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snpName));
+ cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snapshotName()));
// Schedule task on a checkpoint and wait when it starts.
try {
@@ -1305,7 +1390,7 @@
/**
* @return The executor used to run snapshot tasks.
*/
- Executor snapshotExecutorService() {
+ ExecutorService snapshotExecutorService() {
assert snpRunner != null;
return snpRunner;
@@ -1319,6 +1404,13 @@
}
/**
+ * @return Factory to create IO interface over a page stores.
+ */
+ FileIOFactory ioFactory() {
+ return ioFactory;
+ }
+
+ /**
* @return Relative configured path of persistence data storage directory for the local node.
* Example: {@code snapshotWorkDir/db/IgniteNodeName0}
*/
@@ -1775,49 +1867,6 @@
}
}
- /** Snapshot start request for {@link DistributedProcess} initiate message. */
- private static class SnapshotOperationRequest implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Unique snapshot request id. */
- private final UUID rqId;
-
- /** Source node id which trigger request. */
- private final UUID srcNodeId;
-
- /** Snapshot name. */
- private final String snpName;
-
- /** The list of cache groups to include into snapshot. */
- @GridToStringInclude
- private final List<Integer> grpIds;
-
- /** The list of affected by snapshot operation baseline nodes. */
- @GridToStringInclude
- private final Set<UUID> bltNodes;
-
- /** Exception occurred during snapshot operation processing. */
- private volatile IgniteCheckedException err;
-
- /**
- * @param snpName Snapshot name.
- * @param grpIds Cache groups to include into snapshot.
- */
- public SnapshotOperationRequest(UUID rqId, UUID srcNodeId, String snpName, List<Integer> grpIds, Set<UUID> bltNodes) {
- this.rqId = rqId;
- this.srcNodeId = srcNodeId;
- this.snpName = snpName;
- this.grpIds = grpIds;
- this.bltNodes = bltNodes;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SnapshotOperationRequest.class, this);
- }
- }
-
/** */
private static class SnapshotOperationResponse implements Serializable {
/** Serial version uid. */
@@ -1858,18 +1907,18 @@
}
/** */
- private static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
+ protected static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
/** Unique snapshot request id. */
- private final UUID rqId;
+ final UUID rqId;
/** Snapshot name. */
- private final String name;
+ final String name;
/** Snapshot start time. */
- private final long startTime;
+ final long startTime;
/** Snapshot finish time. */
- private volatile long endTime;
+ volatile long endTime;
/**
* Default constructor.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
new file mode 100644
index 0000000..177133f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Snapshot operation start request for {@link DistributedProcess} initiate message.
+ */
+public class SnapshotOperationRequest implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Request ID. */
+ private final UUID reqId;
+
+ /** Snapshot name. */
+ private final String snpName;
+
+ /** Baseline node IDs that must be alive to complete the operation. */
+ @GridToStringInclude
+ private final Set<UUID> nodes;
+
+ /** List of cache group names. */
+ @GridToStringInclude
+ private final Collection<String> grps;
+
+ /** Operational node ID. */
+ private final UUID opNodeId;
+
+ /** Exception occurred during snapshot operation processing. */
+ private volatile Throwable err;
+
+ /**
+ * @param reqId Request ID.
+ * @param opNodeId Operational node ID.
+ * @param snpName Snapshot name.
+ * @param grps List of cache group names.
+ * @param nodes Baseline node IDs that must be alive to complete the operation.
+ */
+ public SnapshotOperationRequest(
+ UUID reqId,
+ UUID opNodeId,
+ String snpName,
+ @Nullable Collection<String> grps,
+ Set<UUID> nodes
+ ) {
+ this.reqId = reqId;
+ this.opNodeId = opNodeId;
+ this.snpName = snpName;
+ this.grps = grps;
+ this.nodes = nodes;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Snapshot name.
+ */
+ public String snapshotName() {
+ return snpName;
+ }
+
+ /**
+ * @return List of cache group names.
+ */
+ public @Nullable Collection<String> groups() {
+ return grps;
+ }
+
+ /**
+ * @return Baseline node IDs that must be alive to complete the operation.
+ */
+ public Set<UUID> nodes() {
+ return nodes;
+ }
+
+ /**
+ * @return Operational node ID.
+ */
+ public UUID operationalNodeId() {
+ return opNodeId;
+ }
+
+ /**
+ * @return Exception occurred during snapshot operation processing.
+ */
+ public Throwable error() {
+ return err;
+ }
+
+ /**
+ * @param err Exception occurred during snapshot operation processing.
+ */
+ public void error(Throwable err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotOperationRequest.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
new file mode 100644
index 0000000..038561b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+
+/**
+ * Distributed process to restore cache group from the snapshot.
+ */
+public class SnapshotRestoreProcess {
+ /** Temporary cache directory prefix. */
+ public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";
+
+ /** Reject operation message. */
+ private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. ";
+
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** Cache group restore prepare phase. */
+ private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
+
+ /** Cache group restore cache start phase. */
+ private final DistributedProcess<UUID, Boolean> cacheStartProc;
+
+ /** Cache group restore rollback phase. */
+ private final DistributedProcess<UUID, Boolean> rollbackRestoreProc;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Future to be completed when the cache restore process is complete (this future will be returned to the user). */
+ private volatile ClusterSnapshotFuture fut;
+
+ /** Snapshot restore operation context. */
+ private volatile SnapshotRestoreContext opCtx;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public SnapshotRestoreProcess(GridKernalContext ctx) {
+ this.ctx = ctx;
+
+ log = ctx.log(getClass());
+
+ prepareRestoreProc = new DistributedProcess<>(
+ ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
+
+ cacheStartProc = new DistributedProcess<>(
+ ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
+
+ rollbackRestoreProc = new DistributedProcess<>(
+ ctx, RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK, this::rollback, this::finishRollback);
+ }
+
+ /**
+ * Cleanup temporary directories if any exists.
+ *
+ * @throws IgniteCheckedException If it was not possible to delete some temporary directory.
+ */
+ protected void cleanup() throws IgniteCheckedException {
+ FilePageStoreManager pageStore = (FilePageStoreManager)ctx.cache().context().pageStore();
+
+ File dbDir = pageStore.workDir();
+
+ for (File dir : dbDir.listFiles(dir -> dir.isDirectory() && dir.getName().startsWith(TMP_CACHE_DIR_PREFIX))) {
+ if (!U.delete(dir)) {
+ throw new IgniteCheckedException("Unable to remove temporary directory, " +
+ "try deleting it manually [dir=" + dir + ']');
+ }
+ }
+ }
+
+ /**
+ * Start cache group restore operation.
+ *
+ * @param snpName Snapshot name.
+ * @param cacheGrpNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
+ * @return Future that will be completed when the restore operation is complete and the cache groups are started.
+ */
+ public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cacheGrpNames) {
+ ClusterSnapshotFuture fut0;
+
+ try {
+ if (ctx.clientNode())
+ throw new IgniteException(OP_REJECT_MSG + "Client and daemon nodes can not perform this operation.");
+
+ DiscoveryDataClusterState clusterState = ctx.state().clusterState();
+
+ if (clusterState.state() != ClusterState.ACTIVE || clusterState.transition())
+ throw new IgniteException(OP_REJECT_MSG + "The cluster should be active.");
+
+ if (!clusterState.hasBaselineTopology())
+ throw new IgniteException(OP_REJECT_MSG + "The baseline topology is not configured for cluster.");
+
+ if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), SNAPSHOT_RESTORE_CACHE_GROUP))
+ throw new IgniteException(OP_REJECT_MSG + "Not all nodes in the cluster support restore operation.");
+
+ if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+ throw new IgniteException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
+
+ synchronized (this) {
+ if (isRestoring() || fut != null)
+ throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed.");
+
+ fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName);
+
+ fut0 = fut;
+ }
+ }
+ catch (IgniteException e) {
+ return new IgniteFinishedFutureImpl<>(e);
+ }
+
+ ctx.cache().context().snapshotMgr().collectSnapshotMetadata(snpName).listen(
+ f -> {
+ if (f.error() != null) {
+ finishProcess(fut0.rqId, f.error());
+
+ return;
+ }
+
+ Set<UUID> dataNodes = new HashSet<>();
+ Set<String> snpBltNodes = null;
+ Map<ClusterNode, List<SnapshotMetadata>> metas = f.result();
+ Map<Integer, String> reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() :
+ cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v));
+
+ for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
+ SnapshotMetadata meta = F.first(entry.getValue());
+
+ assert meta != null : entry.getKey().id();
+
+ if (!entry.getKey().consistentId().equals(meta.consistentId()))
+ continue;
+
+ if (snpBltNodes == null)
+ snpBltNodes = new HashSet<>(meta.baselineNodes());
+
+ dataNodes.add(entry.getKey().id());
+
+ reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+ }
+
+ if (snpBltNodes == null) {
+ finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " +
+ "has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
+
+ return;
+ }
+
+ if (!reqGrpIds.isEmpty()) {
+ finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " +
+ "found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']'));
+
+ return;
+ }
+
+ Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+ node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
+
+ snpBltNodes.removeAll(bltNodes);
+
+ if (!snpBltNodes.isEmpty()) {
+ finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
+ "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
+
+ return;
+ }
+
+ ctx.cache().context().snapshotMgr().runSnapshotVerification(metas).listen(
+ f0 -> {
+ if (f0.error() != null) {
+ finishProcess(fut0.rqId, f0.error());
+
+ return;
+ }
+
+ IdleVerifyResultV2 res = f0.result();
+
+ if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) {
+ StringBuilder sb = new StringBuilder();
+
+ res.print(sb::append, true);
+
+ finishProcess(fut0.rqId, new IgniteException(sb.toString()));
+
+ return;
+ }
+
+ SnapshotOperationRequest req = new SnapshotOperationRequest(
+ fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
+
+ prepareRestoreProc.start(req.requestId(), req);
+ }
+ );
+ }
+ );
+
+ return new IgniteFutureImpl<>(fut0);
+ }
+
+ /**
+ * Check if snapshot restore process is currently running.
+ *
+ * @return {@code True} if the snapshot restore operation is in progress.
+ */
+ public boolean isRestoring() {
+ return isRestoring(null, null);
+ }
+
+ /**
+ * Check if the cache or group with the specified name is currently being restored from the snapshot.
+ *
+ * @param cacheName Cache name.
+ * @param grpName Cache group name.
+ * @return {@code True} if the cache or group with the specified name is currently being restored.
+ */
+ public boolean isRestoring(@Nullable String cacheName, @Nullable String grpName) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (opCtx0 == null)
+ return false;
+
+ if (cacheName == null)
+ return true;
+
+ Map<Integer, StoredCacheData> cacheCfgs = opCtx0.cfgs;
+
+ int cacheId = CU.cacheId(cacheName);
+
+ if (cacheCfgs.containsKey(cacheId))
+ return true;
+
+ for (File grpDir : opCtx0.dirs) {
+ String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
+
+ if (grpName != null) {
+ if (cacheName.equals(locGrpName))
+ return true;
+
+ if (CU.cacheId(locGrpName) == CU.cacheId(grpName))
+ return true;
+ }
+ else if (CU.cacheId(locGrpName) == cacheId)
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
+ * starting the cache(s), the whole procedure is rolled back.
+ */
+ public Set<UUID> cacheStartRequiredAliveNodes(IgniteUuid reqId) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
+ return Collections.emptySet();
+
+ return Collections.unmodifiableSet(opCtx0.nodes);
+ }
+
+ /**
+ * Finish local cache group restore process.
+ *
+ * @param reqId Request ID.
+ */
+ private void finishProcess(UUID reqId) {
+ finishProcess(reqId, null);
+ }
+
+ /**
+ * Finish local cache group restore process.
+ *
+ * @param reqId Request ID.
+ * @param err Error, if any.
+ */
+ private void finishProcess(UUID reqId, @Nullable Throwable err) {
+ if (err != null)
+ log.error("Failed to restore snapshot cache group [reqId=" + reqId + ']', err);
+ else if (log.isInfoEnabled())
+ log.info("Successfully restored cache group(s) from the snapshot [reqId=" + reqId + ']');
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (opCtx0 != null && reqId.equals(opCtx0.reqId))
+ opCtx = null;
+
+ synchronized (this) {
+ ClusterSnapshotFuture fut0 = fut;
+
+ if (fut0 != null && reqId.equals(fut0.rqId)) {
+ fut = null;
+
+ ctx.getSystemExecutorService().submit(() -> fut0.onDone(null, err));
+ }
+ }
+ }
+
+ /**
+ * Node left callback.
+ *
+ * @param leftNodeId Left node ID.
+ */
+ public void onNodeLeft(UUID leftNodeId) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) {
+ opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
+ "Required node has left the cluster [nodeId=" + leftNodeId + ']'));
+ }
+ }
+
+ /**
+ * Abort the currently running restore procedure (if any).
+ *
+ * @param reason Interruption reason.
+ */
+ public void interrupt(Exception reason) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (opCtx0 == null)
+ return;
+
+ opCtx0.err.compareAndSet(null, reason);
+
+ IgniteFuture<?> stopFut;
+
+ synchronized (this) {
+ stopFut = opCtx0.stopFut;
+ }
+
+ if (stopFut != null)
+ stopFut.get();
+ }
+
+ /**
+ * Ensures that a cache with the specified name does not exist locally.
+ *
+ * @param name Cache name.
+ */
+ private void ensureCacheAbsent(String name) {
+ int id = CU.cacheId(name);
+
+ if (ctx.cache().cacheGroupDescriptors().containsKey(id) || ctx.cache().cacheDescriptor(id) != null) {
+ throw new IgniteIllegalStateException("Cache \"" + name +
+ "\" should be destroyed manually before perform restore operation.");
+ }
+ }
+
+ /**
+ * @param req Request to prepare cache group restore from the snapshot.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<ArrayList<StoredCacheData>> prepare(SnapshotOperationRequest req) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ try {
+ DiscoveryDataClusterState state = ctx.state().clusterState();
+
+ if (state.state() != ClusterState.ACTIVE || state.transition())
+ throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
+
+ if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+ throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
+
+ for (UUID nodeId : req.nodes()) {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null || !CU.baselineNode(node, state) || !ctx.discovery().alive(node)) {
+ throw new IgniteCheckedException(
+ OP_REJECT_MSG + "Required node has left the cluster [nodeId-" + nodeId + ']');
+ }
+ }
+
+ opCtx = prepareContext(req);
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (opCtx0.dirs.isEmpty())
+ return new GridFinishedFuture<>();
+
+ // Ensure that shared cache groups has no conflicts.
+ for (StoredCacheData cfg : opCtx0.cfgs.values()) {
+ ensureCacheAbsent(cfg.config().getName());
+
+ if (!F.isEmpty(cfg.config().getGroupName()))
+ ensureCacheAbsent(cfg.config().getGroupName());
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting local snapshot restore operation" +
+ " [reqId=" + req.requestId() +
+ ", snapshot=" + req.snapshotName() +
+ ", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']');
+ }
+
+ Consumer<Throwable> errHnd = (ex) -> opCtx.err.compareAndSet(null, ex);
+ BooleanSupplier stopChecker = () -> opCtx.err.get() != null;
+ GridFutureAdapter<ArrayList<StoredCacheData>> retFut = new GridFutureAdapter<>();
+
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping.");
+
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+
+ restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd)
+ .thenAccept(res -> {
+ try {
+ Throwable err = opCtx.err.get();
+
+ if (err != null)
+ throw err;
+
+ for (File src : opCtx0.dirs)
+ Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+ catch (Throwable t) {
+ log.error("Unable to restore cache group(s) from the snapshot " +
+ "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t);
+
+ retFut.onDone(t);
+
+ return;
+ }
+
+ retFut.onDone(new ArrayList<>(opCtx.cfgs.values()));
+ });
+
+ return retFut;
+ }
+ catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
+ log.error("Unable to restore cache group(s) from the snapshot " +
+ "[reqId=" + req.requestId() + ", snapshot=" + req.snapshotName() + ']', e);
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheDir Cache directory.
+ * @return Temporary directory.
+ */
+ private File formatTmpDirName(File cacheDir) {
+ return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
+ }
+
+ /**
+ * Copy partition files and update binary metadata.
+ *
+ * @param snpName Snapshot name.
+ * @param dirs Cache directories to restore from the snapshot.
+ * @param updateMeta Update binary metadata flag.
+ * @param stopChecker Process interrupt checker.
+ * @param errHnd Error handler.
+ * @throws IgniteCheckedException If failed.
+ */
+ private CompletableFuture<Void> restoreAsync(
+ String snpName,
+ Collection<File> dirs,
+ boolean updateMeta,
+ BooleanSupplier stopChecker,
+ Consumer<Throwable> errHnd
+ ) throws IgniteCheckedException {
+ IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
+ String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
+
+ List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+ if (updateMeta) {
+ File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName);
+
+ futs.add(CompletableFuture.runAsync(() -> {
+ try {
+ ctx.cacheObjects().updateMetadata(binDir, stopChecker);
+ }
+ catch (Throwable t) {
+ errHnd.accept(t);
+ }
+ }, snapshotMgr.snapshotExecutorService()));
+ }
+
+ for (File cacheDir : dirs) {
+ File tmpCacheDir = formatTmpDirName(cacheDir);
+ File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName),
+ Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
+
+ assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir;
+
+ for (File snpFile : snpCacheDir.listFiles()) {
+ futs.add(CompletableFuture.runAsync(() -> {
+ if (stopChecker.getAsBoolean())
+ return;
+
+ try {
+ if (Thread.interrupted())
+ throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+
+ File target = new File(tmpCacheDir, snpFile.getName());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Copying file from the snapshot " +
+ "[snapshot=" + snpName +
+ ", src=" + snpFile +
+ ", target=" + target + "]");
+ }
+
+ IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length());
+ }
+ catch (Throwable t) {
+ errHnd.accept(t);
+ }
+ }, ctx.cache().context().snapshotMgr().snapshotExecutorService()));
+ }
+ }
+
+ int futsSize = futs.size();
+
+ return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]));
+ }
+
+ /**
+ * @param req Request to prepare cache group restore from the snapshot.
+ * @return Snapshot restore operation context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private SnapshotRestoreContext prepareContext(SnapshotOperationRequest req) throws IgniteCheckedException {
+ if (opCtx != null) {
+ throw new IgniteCheckedException(OP_REJECT_MSG +
+ "The previous snapshot restore operation was not completed.");
+ }
+
+ GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
+
+ SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName()));
+
+ if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString()))
+ return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap());
+
+ if (meta.pageSize() != cctx.database().pageSize()) {
+ throw new IgniteCheckedException("Incompatible memory page size " +
+ "[snapshotPageSize=" + meta.pageSize() +
+ ", local=" + cctx.database().pageSize() +
+ ", snapshot=" + req.snapshotName() +
+ ", nodeId=" + cctx.localNodeId() + ']');
+ }
+
+ List<File> cacheDirs = new ArrayList<>();
+ Map<String, StoredCacheData> cfgsByName = new HashMap<>();
+ FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
+
+ // Collect the cache configurations and prepare a temporary directory for copying files.
+ for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName())) {
+ String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
+
+ if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
+ continue;
+
+ File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
+
+ if (cacheDir.exists()) {
+ if (!cacheDir.isDirectory()) {
+ throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
+ "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
+ }
+
+ if (cacheDir.list().length > 0) {
+ throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
+ "[group=" + grpName + ", dir=" + cacheDir + ']');
+ }
+
+ if (!cacheDir.delete()) {
+ throw new IgniteCheckedException("Unable to remove empty cache directory " +
+ "[group=" + grpName + ", dir=" + cacheDir + ']');
+ }
+ }
+
+ File tmpCacheDir = formatTmpDirName(cacheDir);
+
+ if (tmpCacheDir.exists()) {
+ throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
+ "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+ }
+
+ if (!tmpCacheDir.mkdir()) {
+ throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " +
+ "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+ }
+
+ cacheDirs.add(cacheDir);
+
+ pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
+ }
+
+ Map<Integer, StoredCacheData> cfgsById =
+ cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
+
+ return new SnapshotRestoreContext(req, cacheDirs, cfgsById);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res, Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = F.first(errs.values());
+
+ assert opCtx0 != null || failure != null : "Context has not been created on the node " + ctx.localNodeId();
+
+ if (opCtx0 == null || !reqId.equals(opCtx0.reqId)) {
+ finishProcess(reqId, failure);
+
+ return;
+ }
+
+ if (failure == null)
+ failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+
+ // Context has been created - should rollback changes cluster-wide.
+ if (failure != null) {
+ opCtx0.err.compareAndSet(null, failure);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+
+ return;
+ }
+
+ Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
+
+ for (List<StoredCacheData> storedCfgs : res.values()) {
+ if (storedCfgs == null)
+ continue;
+
+ for (StoredCacheData cacheData : storedCfgs)
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+ }
+
+ opCtx0.cfgs = globalCfgs;
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ cacheStartProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Throwable err = opCtx0.err.get();
+
+ if (err != null)
+ return new GridFinishedFuture<>(err);
+
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return new GridFinishedFuture<>();
+
+ Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting restored caches " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+ ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+ }
+
+ // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
+ // the cluster during the cache startup, the whole procedure will be rolled back.
+ return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ if (failure == null) {
+ finishProcess(reqId);
+
+ return;
+ }
+
+ opCtx0.err.compareAndSet(null, failure);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param reqNodes Set of required topology nodes.
+ * @param respNodes Set of responding topology nodes.
+ * @return Error, if no response was received from the required topology node.
+ */
+ private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
+ if (!respNodes.containsAll(reqNodes)) {
+ Set<UUID> leftNodes = new HashSet<>(reqNodes);
+
+ leftNodes.removeAll(respNodes);
+
+ return new ClusterTopologyCheckedException(OP_REJECT_MSG +
+ "Required node has left the cluster [nodeId=" + leftNodes + ']');
+ }
+
+ return null;
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (F.isEmpty(opCtx0.dirs))
+ return new GridFinishedFuture<>();
+
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+
+ try {
+ ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
+ if (log.isInfoEnabled()) {
+ log.info("Removing restored cache directories [reqId=" + reqId +
+ ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
+ }
+
+ IgniteCheckedException ex = null;
+
+ for (File cacheDir : opCtx0.dirs) {
+ File tmpCacheDir = formatTmpDirName(cacheDir);
+
+ if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
+ log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
+ "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+
+ ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
+ }
+
+ if (cacheDir.exists() && !U.delete(cacheDir)) {
+ log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
+ "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+
+ ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
+ }
+ }
+
+ if (ex != null)
+ retFut.onDone(ex);
+ else
+ retFut.onDone(true);
+ });
+ }
+ catch (RejectedExecutionException e) {
+ log.error("Unable to perform rollback routine, task has been rejected " +
+ "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+
+ retFut.onDone(e);
+ }
+ }
+
+ return retFut;
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ if (!errs.isEmpty()) {
+ log.warning("Some nodes were unable to complete the rollback routine completely, check the local log " +
+ "files for more information [nodeIds=" + errs.keySet() + ']');
+ }
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (!res.keySet().containsAll(opCtx0.nodes)) {
+ Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes);
+
+ leftNodes.removeAll(res.keySet());
+
+ log.warning("Some of the nodes left the cluster and were unable to complete the rollback" +
+ " operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
+ }
+
+ finishProcess(reqId, opCtx0.err.get());
+ }
+
+ /**
+ * Cache group restore from snapshot operation context.
+ */
+ private static class SnapshotRestoreContext {
+ /** Request ID. */
+ private final UUID reqId;
+
+ /** Snapshot name. */
+ private final String snpName;
+
+ /** Baseline node IDs that must be alive to complete the operation. */
+ private final Set<UUID> nodes;
+
+ /** List of restored cache group directories. */
+ private final Collection<File> dirs;
+
+ /** The exception that led to the interruption of the process. */
+ private final AtomicReference<Throwable> err = new AtomicReference<>();
+
+ /** Cache ID to configuration mapping. */
+ private volatile Map<Integer, StoredCacheData> cfgs;
+
+ /** Graceful shutdown future. */
+ private volatile IgniteFuture<?> stopFut;
+
+ /**
+ * @param req Request to prepare cache group restore from the snapshot.
+ * @param dirs List of cache group names to restore from the snapshot.
+ * @param cfgs Cache ID to configuration mapping.
+ */
+ protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
+ Map<Integer, StoredCacheData> cfgs) {
+ reqId = req.requestId();
+ snpName = req.snapshotName();
+ nodes = new HashSet<>(req.nodes());
+
+ this.dirs = dirs;
+ this.cfgs = cfgs;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 7ccfee0..170a3c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
+import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -307,6 +308,15 @@
public void saveMetadata(Collection<BinaryType> types, File dir);
/**
+ * Merge the binary metadata files stored in the specified directory.
+ *
+ * @param metadataDir Directory containing binary metadata files.
+ * @param stopChecker Process interrupt checker.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void updateMetadata(File metadataDir, BooleanSupplier stopChecker) throws IgniteCheckedException;
+
+ /**
* @param typeName Type name.
* @param ord ordinal.
* @return Enum object.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 7d9e6ae..730b9e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -445,6 +445,21 @@
/**
* Rotate performance statistics.
*/
- PERFORMANCE_STATISTICS_ROTATE
+ PERFORMANCE_STATISTICS_ROTATE,
+
+ /**
+ * Cache group restore prepare phase.
+ */
+ RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+
+ /**
+ * Cache group restore cache start phase.
+ */
+ RESTORE_CACHE_GROUP_SNAPSHOT_START,
+
+ /**
+ * Cache group restore rollback phase.
+ */
+ RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 74976b1..b873f7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -111,7 +111,7 @@
protected final List<Integer> locEvts = new CopyOnWriteArrayList<>();
/** Configuration for the 'default' cache. */
- protected volatile CacheConfiguration<Integer, Integer> dfltCacheCfg;
+ protected volatile CacheConfiguration<Integer, Object> dfltCacheCfg;
/** Enable default data region persistence. */
protected boolean persistence = true;
@@ -124,6 +124,9 @@
discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder());
+ if (dfltCacheCfg != null)
+ cfg.setCacheConfiguration(dfltCacheCfg);
+
return cfg.setConsistentId(igniteInstanceName)
.setCommunicationSpi(new TestRecordingCommunicationSpi())
.setDataStorageConfiguration(new DataStorageConfiguration()
@@ -132,7 +135,6 @@
.setPersistenceEnabled(persistence))
.setCheckpointFrequency(3000)
.setPageSize(DFLT_PAGE_SIZE))
- .setCacheConfiguration(dfltCacheCfg)
.setClusterStateOnStart(INACTIVE)
.setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT)
.setDiscoverySpi(discoSpi);
@@ -185,7 +187,7 @@
* @param ccfg Default cache configuration.
* @return Cache configuration.
*/
- protected static <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
+ protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
return ccfg.setCacheMode(CacheMode.PARTITIONED)
.setBackups(2)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
@@ -239,7 +241,7 @@
* @return Ignite instance.
* @throws Exception If fails.
*/
- protected IgniteEx startGridWithCache(CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+ protected IgniteEx startGridWithCache(CacheConfiguration<Integer, Object> ccfg, int keys) throws Exception {
return startGridsWithCache(1, ccfg, keys);
}
@@ -250,7 +252,7 @@
* @return Ignite instance.
* @throws Exception If fails.
*/
- protected IgniteEx startGridsWithCache(int grids, CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+ protected IgniteEx startGridsWithCache(int grids, CacheConfiguration<Integer, Object> ccfg, int keys) throws Exception {
dfltCacheCfg = ccfg;
return startGridsWithCache(grids, keys, Integer::new, ccfg);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
new file mode 100644
index 0000000..62f4619
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.function.Function;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.internal.IgniteEx;
+
+/**
+ * Snapshot restore test base.
+ */
+public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnapshotSelfTest {
+ /** Timeout. */
+ protected static final long TIMEOUT = 15_000;
+
+ /** Cache value builder. */
+ protected abstract Function<Integer, Object> valueBuilder();
+
+ /**
+ * @param nodesCnt Nodes count.
+ * @param keysCnt Number of keys to create.
+ * @return Ignite coordinator instance.
+ * @throws Exception if failed.
+ */
+ protected IgniteEx startGridsWithSnapshot(int nodesCnt, int keysCnt) throws Exception {
+ return startGridsWithSnapshot(nodesCnt, keysCnt, false);
+ }
+
+ /**
+ * @param nodesCnt Nodes count.
+ * @param keysCnt Number of keys to create.
+ * @param startClient {@code True} to start an additional client node.
+ * @return Ignite coordinator instance.
+ * @throws Exception if failed.
+ */
+ protected IgniteEx startGridsWithSnapshot(int nodesCnt, int keysCnt, boolean startClient) throws Exception {
+ IgniteEx ignite = startGridsWithCache(nodesCnt, keysCnt, valueBuilder(), dfltCacheCfg);
+
+ if (startClient)
+ ignite = startClientGrid("client");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ ignite.cache(dfltCacheCfg.getName()).destroy();
+
+ awaitPartitionMapExchange();
+
+ return ignite;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param keysCnt Expected number of keys.
+ */
+ protected void assertCacheKeys(IgniteCache<Object, Object> cache, int keysCnt) {
+ assertEquals(keysCnt, cache.size());
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(valueBuilder().apply(i), cache.get(i));
+ }
+
+ /** */
+ protected class BinaryValueBuilder implements Function<Integer, Object> {
+ /** Binary type name. */
+ private final String typeName;
+
+ /**
+ * @param typeName Binary type name.
+ */
+ BinaryValueBuilder(String typeName) {
+ this.typeName = typeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object apply(Integer key) {
+ BinaryObjectBuilder builder = grid(0).binary().builder(typeName);
+
+ builder.setField("id", key);
+ builder.setField("name", String.valueOf(key));
+
+ return builder.build();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
new file mode 100644
index 0000000..f3359f1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * Snapshot restore tests.
+ */
+public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotRestoreBaseTest {
+ /** Type name used for binary and SQL. */
+ private static final String TYPE_NAME = "CustomType";
+
+ /** Cache 1 name. */
+ private static final String CACHE1 = "cache1";
+
+ /** Cache 2 name. */
+ private static final String CACHE2 = "cache2";
+
+ /** Default shared cache group name. */
+ private static final String SHARED_GRP = "shared";
+
+ /** Cache value builder. */
+ private Function<Integer, Object> valBuilder = String::valueOf;
+
+ /** {@inheritDoc} */
+ @Override protected Function<Integer, Object> valueBuilder() {
+ return valBuilder;
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testRestoreAllGroups() throws Exception {
+ CacheConfiguration<Integer, Object> cacheCfg1 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+ CacheConfiguration<Integer, Object> cacheCfg2 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder,
+ dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ ignite.cache(CACHE1).destroy();
+ ignite.cache(CACHE2).destroy();
+ ignite.cache(DEFAULT_CACHE_NAME).destroy();
+
+ awaitPartitionMapExchange();
+
+ // Restore all cache groups.
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
+ assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testStartClusterSnapshotRestoreMultipleThreadsSameNode() throws Exception {
+ checkStartClusterSnapshotRestoreMultithreaded(() -> 0);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testStartClusterSnapshotRestoreMultipleThreadsDiffNode() throws Exception {
+ AtomicInteger nodeIdx = new AtomicInteger();
+
+ checkStartClusterSnapshotRestoreMultithreaded(nodeIdx::getAndIncrement);
+ }
+
+ /**
+ * @param nodeIdxSupplier Ignite node index supplier.
+ */
+ private void checkStartClusterSnapshotRestoreMultithreaded(IntSupplier nodeIdxSupplier) throws Exception {
+ Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+ AtomicInteger successCnt = new AtomicInteger();
+ AtomicInteger failCnt = new AtomicInteger();
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ nodeIdxSupplier.getAsInt();
+
+ grid(nodeIdxSupplier.getAsInt()).snapshot().restoreSnapshot(
+ SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+ successCnt.incrementAndGet();
+ }
+ catch (Exception e) {
+ failCnt.incrementAndGet();
+ }
+ }, 2, "runner");
+
+ fut.get(TIMEOUT);
+
+ assertEquals(1, successCnt.get());
+ assertEquals(1, failCnt.get());
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testCreateSnapshotDuringRestore() throws Exception {
+ Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+ BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0));
+
+ discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch);
+
+ IgniteFuture<Void> fut =
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ discoSpi.waitBlocked(TIMEOUT);
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> grid(1).snapshot().createSnapshot("NEW_SNAPSHOT").get(TIMEOUT),
+ IgniteException.class,
+ "Cache group restore operation is currently in progress."
+ );
+
+ discoSpi.unblock();
+
+ fut.get(TIMEOUT);
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ }
+
+ /**
+ * Ensures that the cache doesn't start if one of the baseline nodes fails.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNodeLeftDuringCacheStartOnExchangeInit() throws Exception {
+ startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true);
+
+ BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0));
+
+ discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch);
+
+ IgniteFuture<Void> fut =
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ discoSpi.waitBlocked(TIMEOUT);
+
+ stopGrid(2, true);
+
+ discoSpi.unblock();
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+
+ ensureCacheAbsent(dfltCacheCfg);
+ }
+
+ /**
+ * Ensures that the cache is not started if non-coordinator node left during the exchange.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNodeLeftDuringCacheStartOnExchangeFinish() throws Exception {
+ checkNodeLeftOnExchangeFinish(
+ false, ClusterTopologyCheckedException.class, "Required node has left the cluster");
+ }
+
+ /**
+ * Ensures that the cache is not started if the coordinator left during the exchange.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCrdLeftDuringCacheStartOnExchangeFinish() throws Exception {
+ checkNodeLeftOnExchangeFinish(
+ true, IgniteCheckedException.class, "Operation has been cancelled (node is stopping)");
+ }
+
+ /**
+ * @param crdStop {@code True} to stop coordinator node.
+ * @param expCls Expected exception class.
+ * @param expMsg Expected exception message.
+ * @throws Exception If failed.
+ */
+ private void checkNodeLeftOnExchangeFinish(
+ boolean crdStop,
+ Class<? extends Throwable> expCls,
+ String expMsg
+ ) throws Exception {
+ startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true);
+
+ TestRecordingCommunicationSpi node1spi = TestRecordingCommunicationSpi.spi(grid(1));
+ TestRecordingCommunicationSpi node2spi = TestRecordingCommunicationSpi.spi(grid(2));
+
+ node1spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+ node2spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+
+ IgniteFuture<Void> fut =
+ grid(1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ node1spi.waitForBlocked();
+ node2spi.waitForBlocked();
+
+ stopGrid(crdStop ? 0 : 2, true);
+
+ node1spi.stopBlock();
+
+ if (crdStop)
+ node2spi.stopBlock();
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), expCls, expMsg);
+
+ awaitPartitionMapExchange();
+
+ ensureCacheAbsent(dfltCacheCfg);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testClusterSnapshotRestoreRejectOnInActiveCluster() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ ignite.cluster().state(ClusterState.INACTIVE);
+
+ IgniteFuture<Void> fut =
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ GridTestUtils.assertThrowsAnyCause(
+ log, () -> fut.get(TIMEOUT), IgniteException.class, "The cluster should be active");
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testClusterSnapshotRestoreOnSmallerTopology() throws Exception {
+ startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true);
+
+ stopGrid(1);
+
+ resetBaselineTopology();
+
+ IgniteFuture<Void> fut =
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
+
+ ensureCacheAbsent(dfltCacheCfg);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testRestoreSharedCacheGroup() throws Exception {
+ CacheConfiguration<Integer, Object> cacheCfg1 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+ CacheConfiguration<Integer, Object> cacheCfg2 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, cacheCfg1, cacheCfg2);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ ignite.cache(CACHE1).destroy();
+
+ awaitPartitionMapExchange();
+
+ IgniteSnapshot snp = ignite.snapshot();
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> snp.restoreSnapshot(SNAPSHOT_NAME, Arrays.asList(CACHE1, CACHE2)).get(TIMEOUT),
+ IllegalArgumentException.class,
+ "Cache group(s) was not found in the snapshot"
+ );
+
+ ignite.cache(CACHE2).destroy();
+
+ awaitPartitionMapExchange();
+
+ snp.restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(SHARED_GRP)).get(TIMEOUT);
+
+ assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
+ assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testIncompatibleMetasUpdate() throws Exception {
+ valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
+ IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+ int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
+
+ ignite.context().cacheObjects().removeType(typeId);
+
+ BinaryObject[] objs = new BinaryObject[CACHE_KEYS_RANGE];
+
+ IgniteCache<Integer, Object> cache1 = createCacheWithBinaryType(ignite, "cache1", n -> {
+ BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME);
+
+ builder.setField("id", n);
+
+ objs[n] = builder.build();
+
+ return objs[n];
+ });
+
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+ // Ensure that existing type has been updated.
+ BinaryType type = ignite.context().cacheObjects().metadata(typeId);
+
+ assertTrue(type.fieldNames().contains("name"));
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ assertEquals(objs[i], cache1.get(i));
+
+ cache1.destroy();
+
+ grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+ ignite.context().cacheObjects().removeType(typeId);
+
+ // Create cache with incompatible binary type.
+ cache1 = createCacheWithBinaryType(ignite, "cache1", n -> {
+ BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME);
+
+ builder.setField("id", UUID.randomUUID());
+
+ objs[n] = builder.build();
+
+ return objs[n];
+ });
+
+ IgniteFuture<Void> fut0 =
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), BinaryObjectException.class, null);
+
+ ensureCacheAbsent(dfltCacheCfg);
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ assertEquals(objs[i], cache1.get(i));
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ * @param valBuilder Binary value builder.
+ * @return Created cache.
+ */
+ private IgniteCache<Integer, Object> createCacheWithBinaryType(
+ Ignite ignite,
+ String cacheName,
+ Function<Integer, BinaryObject> valBuilder
+ ) {
+ IgniteCache<Integer, Object> cache = ignite.createCache(new CacheConfiguration<>(cacheName)).withKeepBinary();
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ cache.put(i, valBuilder.apply(i));
+
+ return cache;
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ @Test
+ public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception {
+ checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class,
+ "Cache start failed. A cache or group with the same name is currently being restored from a snapshot");
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ @Test
+ public void testParallelCacheStartWithTheSameNameOnStart() throws Exception {
+ checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_START, CacheExistsException.class,
+ "Failed to start cache (a cache with the same name is already started):");
+ }
+
+ /**
+ * @param procType The type of distributed process on which communication is blocked.
+ * @throws Exception if failed.
+ */
+ private void checkCacheStartWithTheSameName(
+ DistributedProcessType procType,
+ Class<? extends Throwable> expCls,
+ String expMsg
+ ) throws Exception {
+ dfltCacheCfg = txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+ IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
+
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, SHARED_GRP);
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(SHARED_GRP), IgniteCheckedException.class, null);
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(CACHE1), expCls, expMsg);
+
+ spi.stopBlock();
+
+ fut.get(TIMEOUT);
+
+ assertCacheKeys(grid(0).cache(CACHE1), CACHE_KEYS_RANGE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testNodeFailDuringRestore() throws Exception {
+ startGridsWithSnapshot(4, CACHE_KEYS_RANGE);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3));
+
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+
+ IgniteInternalFuture<?> fut0 = runAsync(() -> stopGrid(3, true));
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> fut.get(TIMEOUT),
+ ClusterTopologyCheckedException.class,
+ "Required node has left the cluster"
+ );
+
+ fut0.get(TIMEOUT);
+
+ awaitPartitionMapExchange();
+
+ ensureCacheAbsent(dfltCacheCfg);
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> startGrid(3),
+ IgniteSpiException.class,
+ "to add the node to cluster - remove directories with the caches"
+ );
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testNodeFailDuringFilesCopy() throws Exception {
+ dfltCacheCfg.setCacheMode(CacheMode.REPLICATED);
+
+ startGridsWithSnapshot(3, CACHE_KEYS_RANGE);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(2));
+ CountDownLatch stopLatch = new CountDownLatch(1);
+
+ spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage &&
+ ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
+
+ String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME,
+ PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString();
+
+ grid(2).context().cache().context().snapshotMgr().ioFactory(
+ new CustomFileIOFactory(new RandomAccessFileIOFactory(),
+ file -> {
+ if (file.getPath().endsWith(failingFilePath)) {
+ stopLatch.countDown();
+
+ throw new RuntimeException("Test exception");
+ }
+ }));
+
+ File node2dbDir = ((FilePageStoreManager)grid(2).context().cache().context().pageStore()).
+ cacheWorkDir(dfltCacheCfg).getParentFile();
+
+ IgniteInternalFuture<Object> stopFut = runAsync(() -> {
+ U.await(stopLatch, TIMEOUT, TimeUnit.MILLISECONDS);
+
+ stopGrid(2, true);
+
+ return null;
+ });
+
+ IgniteFuture<Void> fut =
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+
+ stopFut.get(TIMEOUT);
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+
+ File[] files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
+ assertEquals("A temp directory with potentially corrupted files must exist.", 1, files.length);
+
+ ensureCacheAbsent(dfltCacheCfg);
+
+ dfltCacheCfg = null;
+
+ startGrid(2);
+
+ files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
+ assertEquals("A temp directory should be removed at node startup", 0, files.length);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testNodeJoinDuringRestore() throws Exception {
+ Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
+
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> startGrid(2),
+ IgniteSpiException.class,
+ "Joining node during caches restore is not allowed"
+ );
+
+ spi.stopBlock();
+
+ fut.get(TIMEOUT);
+
+ IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ assertTrue(cache.indexReadyFuture().isDone());
+
+ assertCacheKeys(cache, CACHE_KEYS_RANGE);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testClusterStateChangeActiveReadonlyOnPrepare() throws Exception {
+ checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+ IgniteException.class, "Failed to perform start cache operation (cluster is in read-only mode)");
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testClusterStateChangeActiveReadonlyOnCacheStart() throws Exception {
+ checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testClusterDeactivateOnPrepare() throws Exception {
+ checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
+ IgniteException.class, "The cluster has been deactivated.");
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testClusterDeactivateOnCacheStart() throws Exception {
+ checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null);
+ }
+
+ /**
+ * @param state Cluster state.
+ * @param procType The type of distributed process on which communication is blocked.
+ * @param exCls Expected exception class.
+ * @param expMsg Expected exception message.
+ * @throws Exception if failed.
+ */
+ private void checkClusterStateChange(
+ ClusterState state,
+ DistributedProcessType procType,
+ @Nullable Class<? extends Throwable> exCls,
+ @Nullable String expMsg
+ ) throws Exception {
+ int nodesCnt = 2;
+
+ Ignite ignite = startGridsWithSnapshot(nodesCnt, CACHE_KEYS_RANGE, true);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(nodesCnt - 1));
+
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, DEFAULT_CACHE_NAME);
+
+ ignite.cluster().state(state);
+
+ spi.stopBlock();
+
+ if (exCls == null) {
+ fut.get(TIMEOUT);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+
+ return;
+ }
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), exCls, expMsg);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ ensureCacheAbsent(dfltCacheCfg);
+
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ grid(nodesCnt - 1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(cacheName)).get(TIMEOUT);
+
+ assertCacheKeys(ignite.cache(cacheName), CACHE_KEYS_RANGE);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void ensureCacheAbsent(CacheConfiguration<?, ?> ccfg) throws IgniteCheckedException {
+ String cacheName = ccfg.getName();
+
+ for (Ignite ignite : G.allGrids()) {
+ GridKernalContext kctx = ((IgniteEx)ignite).context();
+
+ if (kctx.clientNode())
+ continue;
+
+ CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName));
+
+ assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc);
+
+ GridTestUtils.waitForCondition(
+ () -> !kctx.cache().context().snapshotMgr().isRestoring(),
+ TIMEOUT);
+
+ File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg);
+
+ String errMsg = String.format("%s, dir=%s, exists=%b, files=%s",
+ ignite.name(), dir, dir.exists(), Arrays.toString(dir.list()));
+
+ assertTrue(errMsg, !dir.exists() || dir.list().length == 0);
+ }
+ }
+
+ /**
+ * @param spi Test communication spi.
+ * @param restorePhase The type of distributed process on which communication is blocked.
+ * @param grpName Cache group name.
+ * @return Snapshot restore future.
+ * @throws InterruptedException if interrupted.
+ */
+ private IgniteFuture<Void> waitForBlockOnRestore(
+ TestRecordingCommunicationSpi spi,
+ DistributedProcessType restorePhase,
+ String grpName
+ ) throws InterruptedException {
+ spi.blockMessages((node, msg) ->
+ msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
+
+ IgniteFuture<Void> fut =
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
+
+ spi.waitForBlocked();
+
+ return fut;
+ }
+
+ /**
+ * Custom I/O factory to preprocessing created files.
+ */
+ private static class CustomFileIOFactory implements FileIOFactory {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate factory. */
+ private final FileIOFactory delegate;
+
+ /** Preprocessor for created files. */
+ private final Consumer<File> hnd;
+
+ /**
+ * @param delegate Delegate factory.
+ * @param hnd Preprocessor for created files.
+ */
+ public CustomFileIOFactory(FileIOFactory delegate, Consumer<File> hnd) {
+ this.delegate = delegate;
+ this.hnd = hnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = this.delegate.create(file, modes);
+
+ hnd.accept(file);
+
+ return delegate;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 14cd9d7..fbbe62e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -41,6 +41,7 @@
import org.apache.ignite.internal.processors.cache.persistence.CommonPoolStarvationCheckpointTest;
import org.apache.ignite.internal.processors.cache.persistence.SingleNodePersistenceSslTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
@@ -99,6 +100,7 @@
IgniteClusterSnapshotSelfTest.class,
IgniteClusterSnapshotCheckTest.class,
IgniteSnapshotMXBeanTest.class,
+ IgniteClusterSnapshotRestoreSelfTest.class,
IgniteClusterIdTagTest.class,
FullyConnectedComponentSearcherTest.class,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
index e0aee42e..7f4165d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
@@ -119,7 +119,7 @@
* @param cacheName Cache name.
* @return Cache configuration.
*/
- private static CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) {
+ private CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) {
return txCacheConfig(new CacheConfiguration<Integer, Account>(cacheName))
.setCacheMode(CacheMode.REPLICATED)
.setQueryEntities(singletonList(new QueryEntity(Integer.class.getName(), Account.class.getName())));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
new file mode 100644
index 0000000..17f495c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryBasicNameMapper;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.junit.Test;
+
+/**
+ * Cluster snapshot restore tests verifying SQL and indexing.
+ */
+public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterSnapshotRestoreBaseTest {
+ /** Type name used for binary and SQL. */
+ private static final String TYPE_NAME = IndexedObject.class.getName();
+
+ /** Number of cache keys to pre-create at node start. */
+ private static final int CACHE_KEYS_RANGE = 10_000;
+
+ /** Cache value builder. */
+ private Function<Integer, Object> valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
+ /** {@inheritDoc} */
+ @Override protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
+ return super.txCacheConfig(ccfg).setSqlIndexMaxInlineSize(255).setSqlSchema("PUBLIC")
+ .setQueryEntities(Collections.singletonList(new QueryEntity()
+ .setKeyType(Integer.class.getName())
+ .setValueType(TYPE_NAME)
+ .setFields(new LinkedHashMap<>(F.asMap("id", Integer.class.getName(), "name", String.class.getName())))
+ .setIndexes(Collections.singletonList(new QueryIndex("id")))));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Function<Integer, Object> valueBuilder() {
+ return valBuilder;
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testBasicClusterSnapshotRestore() throws Exception {
+ valBuilder = new IndexedValueBuilder();
+
+ IgniteEx client = startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true);
+
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+ assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testBasicClusterSnapshotRestoreWithMetadata() throws Exception {
+ IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
+
+ // Remove metadata.
+ int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
+
+ ignite.context().cacheObjects().removeType(typeId);
+
+ forceCheckpoint();
+
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+
+ for (Ignite grid : G.allGrids())
+ assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId));
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception {
+ int nodesCnt = 4;
+
+ startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+
+ grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ startGrid(nodesCnt - 2);
+
+ IgniteEx ignite = startGrid(nodesCnt - 1);
+
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange();
+
+ ignite.cache(DEFAULT_CACHE_NAME).destroy();
+
+ awaitPartitionMapExchange();
+
+ // Remove metadata.
+ int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
+
+ ignite.context().cacheObjects().removeType(typeId);
+
+ forceCheckpoint();
+
+ // Restore from an empty node.
+ ignite.snapshot().restoreSnapshot(
+ SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
+
+ awaitPartitionMapExchange();
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void assertCacheKeys(IgniteCache<Object, Object> cache, int keysCnt) {
+ super.assertCacheKeys(cache, keysCnt);
+
+ String tblName = new BinaryBasicNameMapper(true).typeName(TYPE_NAME);
+
+ for (Ignite grid : G.allGrids()) {
+ GridKernalContext ctx = ((IgniteEx)grid).context();
+
+ String nodeId = ctx.localNodeId().toString();
+
+ assertTrue("nodeId=" + nodeId, grid.cache(cache.getName()).indexReadyFuture().isDone());
+
+ // Make sure no index rebuild happened.
+ assertEquals("nodeId=" + nodeId,
+ 0, ctx.cache().cache(cache.getName()).context().cache().metrics0().getIndexRebuildKeysProcessed());
+
+ GridQueryProcessor qry = ((IgniteEx)grid).context().query();
+
+ // Make sure SQL works fine.
+ assertEquals("nodeId=" + nodeId, (long)keysCnt, qry.querySqlFields(new SqlFieldsQuery(
+ "SELECT count(*) FROM " + tblName), true).getAll().get(0).get(0));
+
+ // Make sure the index is in use.
+ String explainPlan = (String)qry.querySqlFields(new SqlFieldsQuery(
+ "explain SELECT * FROM " + tblName + " WHERE id < 10"), true).getAll().get(0).get(0);
+
+ assertTrue("nodeId=" + nodeId + "\n" + explainPlan, explainPlan.contains("ID_ASC_IDX"));
+ }
+ }
+
+ /** */
+ private static class IndexedValueBuilder implements Function<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public Object apply(Integer key) {
+ return new IndexedObject(key, "Person number #" + key);
+ }
+ }
+
+ /** */
+ private static class IndexedObject {
+ /** Id. */
+ @QuerySqlField(index = true)
+ private final int id;
+
+ /** Name. */
+ @QuerySqlField
+ private final String name;
+
+ /**
+ * @param id Id.
+ */
+ public IndexedObject(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ IndexedObject obj = (IndexedObject)o;
+
+ return id == obj.id && Objects.equals(name, obj.name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(name, id);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 593f5df..69cbb71 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
import org.apache.ignite.internal.processors.cache.persistence.db.MultipleParallelCacheDeleteDeadlockTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreWithIndexingTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
@@ -66,7 +67,8 @@
CacheGroupReencryptionTest.class,
IgnitePdsIndexingDefragmentationTest.class,
StopRebuildIndexTest.class,
- ForceRebuildIndexTest.class
+ ForceRebuildIndexTest.class,
+ IgniteClusterSnapshotRestoreWithIndexingTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}