IGNITE-14572 Include metastorage to snapshot (#9047)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
index f97ada7..04cdcdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
@@ -51,12 +51,6 @@
/** Special partition reserved for index space. */
public static final int INDEX_PARTITION = 0xFFFF;
- /** Old special partition reserved for metastore space. */
- public static final int OLD_METASTORE_PARTITION = 0x0;
-
- /** Special partition reserved for metastore space. */
- public static final int METASTORE_PARTITION = 0x1;
-
/** Cache group meta page id. */
public static final long META_PAGE_ID = pageId(INDEX_PARTITION, FLAG_IDX, 0);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 24954a4..05730f3 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -144,9 +144,6 @@
/** */
public static final String DFLT_STORE_DIR = "db";
- /** */
- public static final String META_STORAGE_NAME = "metastorage";
-
/** Matcher for searching of *.tmp files. */
public static final PathMatcher TMP_FILE_MATCHER =
FileSystems.getDefault().getPathMatcher("glob:**" + TMP_SUFFIX);
@@ -298,7 +295,7 @@
storeWorkDir.toPath(), entry -> {
String name = entry.toFile().getName();
- return !name.equals(META_STORAGE_NAME) &&
+ return !name.equals(MetaStorage.METASTORAGE_DIR_NAME) &&
(name.startsWith(CACHE_DIR_PREFIX) || name.startsWith(CACHE_GRP_DIR_PREFIX));
}
)) {
@@ -498,9 +495,9 @@
DataRegion dataRegion = cctx.database().dataRegion(GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME);
CacheStoreHolder holder = initDir(
- new File(storeWorkDir, META_STORAGE_NAME),
+ new File(storeWorkDir, MetaStorage.METASTORAGE_DIR_NAME),
grpId,
- PageIdAllocator.METASTORE_PARTITION + 1,
+ MetaStorage.METASTORAGE_PARTITIONS.size(),
dataRegion.memoryMetrics().totalAllocatedPages()::add,
false);
@@ -1009,6 +1006,7 @@
/**
* @param dir Directory to check.
+ * @param names Cache group names to filter.
* @return Files that match cache or cache group pattern.
*/
public static List<File> cacheDirectories(File dir, Predicate<String> names) {
@@ -1020,7 +1018,8 @@
return Arrays.stream(dir.listFiles())
.sorted()
.filter(File::isDirectory)
- .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+ .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+ f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
.filter(f -> names.test(cacheGroupName(f)))
.collect(Collectors.toList());
}
@@ -1066,6 +1065,8 @@
return name.substring(CACHE_GRP_DIR_PREFIX.length());
else if (name.startsWith(CACHE_DIR_PREFIX))
return name.substring(CACHE_DIR_PREFIX.length());
+ else if (name.equals(MetaStorage.METASTORAGE_DIR_NAME))
+ return MetaStorage.METASTORAGE_CACHE_NAME;
else
throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + dir);
}
@@ -1185,6 +1186,23 @@
}
/**
+ * @param grpId Group id.
+ * @return Name of cache group directory.
+ * @throws IgniteCheckedException If cache group doesn't exist.
+ */
+ public String cacheDirName(int grpId) throws IgniteCheckedException {
+ if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
+ return MetaStorage.METASTORAGE_DIR_NAME;
+
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+ if (gctx == null)
+ throw new IgniteCheckedException("Cache group context has not found due to the cache group is stopped.");
+
+ return cacheDirName(gctx.config());
+ }
+
+ /**
* @param cleanFiles {@code True} if the stores should delete it's files upon close.
*/
private IgniteCheckedException shutdown(Collection<CacheStoreHolder> holders, boolean cleanFiles) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 2c20a02..3a4eae0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -26,9 +26,12 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Executor;
@@ -73,7 +76,6 @@
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_AUX;
-import static org.apache.ignite.internal.pagemem.PageIdAllocator.OLD_METASTORE_PARTITION;
/**
* General purpose key-value local-only storage.
@@ -85,6 +87,19 @@
/** */
public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
+ /** Metastorage cache directory to store data. */
+ public static final String METASTORAGE_DIR_NAME = "metastorage";
+
+ /** Old special partition reserved for metastore space. */
+ public static final int OLD_METASTORE_PARTITION = 0x0;
+
+ /** Special partition reserved for metastore space. */
+ public static final int METASTORE_PARTITION = 0x1;
+
+ /** The set of all metastorage partitions. */
+ public static final Set<Integer> METASTORAGE_PARTITIONS =
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION)));
+
/** This flag is used ONLY FOR TESTING the migration of a metastorage from Part 0 to Part 1. */
public static boolean PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
@@ -245,9 +260,9 @@
*/
private void initInternal(IgniteCacheDatabaseSharedManager db) throws IgniteCheckedException {
if (PRESERVE_LEGACY_METASTORAGE_PARTITION_ID)
- getOrAllocateMetas(partId = PageIdAllocator.OLD_METASTORE_PARTITION);
- else if (!readOnly || getOrAllocateMetas(partId = PageIdAllocator.OLD_METASTORE_PARTITION))
- getOrAllocateMetas(partId = PageIdAllocator.METASTORE_PARTITION);
+ getOrAllocateMetas(partId = OLD_METASTORE_PARTITION);
+ else if (!readOnly || getOrAllocateMetas(partId = OLD_METASTORE_PARTITION))
+ getOrAllocateMetas(partId = METASTORE_PARTITION);
if (!empty) {
CacheDiagnosticManager diagnosticMgr = cctx.diagnostic();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java
index 6de449e8..6da0335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStoreEntry.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.persistence.metastorage;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
@@ -27,7 +26,7 @@
/** */
public MetastorageRowStoreEntry(byte[] val) {
super(0L, MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID ?
- PageIdAllocator.OLD_METASTORE_PARTITION : PageIdAllocator.METASTORE_PARTITION, val);
+ MetaStorage.OLD_METASTORE_PARTITION : MetaStorage.METASTORE_PARTITION, val);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 90304bb..01a39fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -55,6 +55,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -106,6 +107,7 @@
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridBusyLock;
@@ -166,6 +168,8 @@
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getPageIO;
@@ -477,7 +481,8 @@
if (!snpDir.exists())
return;
- assert snpDir.isDirectory() : snpDir;
+ if (!snpDir.isDirectory())
+ return;
try {
File binDir = binaryWorkDir(snpDir.getAbsolutePath(), folderName);
@@ -579,6 +584,7 @@
Set<Integer> leftGrps = new HashSet<>(grpIds);
leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
+ boolean withMetaStorage = leftGrps.remove(METASTORAGE_CACHE_ID);
if (!leftGrps.isEmpty()) {
return new GridFinishedFuture<>(new IgniteCheckedException("Some of requested cache groups doesn't exist " +
@@ -598,14 +604,20 @@
IgniteInternalFuture<Set<GroupPartitionId>> task0;
- if (parts.isEmpty())
+ if (parts.isEmpty() && !withMetaStorage)
task0 = new GridFinishedFuture<>(Collections.emptySet());
else {
task0 = registerSnapshotTask(req.snapshotName(),
req.operationalNodeId(),
parts,
+ withMetaStorage,
locSndrFactory.apply(req.snapshotName()));
+ if (withMetaStorage) {
+ ((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
+ .suspend(((SnapshotFutureTask)task0).started());
+ }
+
clusterSnpReq = req;
}
@@ -945,12 +957,22 @@
* @return The list of cache or cache group names in given snapshot on local node.
*/
public List<File> snapshotCacheDirectories(String snpName, String folderName) {
+ return snapshotCacheDirectories(snpName, folderName, name -> true);
+ }
+
+ /**
+ * @param snpName Snapshot name.
+ * @param folderName Directory name for cache group.
+ * @param names Cache group names to filter.
+ * @return The list of cache or cache group names in given snapshot on local node.
+ */
+ public List<File> snapshotCacheDirectories(String snpName, String folderName, Predicate<String> names) {
File snpDir = snapshotLocalDir(snpName);
if (!snpDir.exists())
return Collections.emptyList();
- return cacheDirectories(new File(snpDir, databaseRelativePath(folderName)), name -> true);
+ return cacheDirectories(new File(snpDir, databaseRelativePath(folderName)), names);
}
/**
@@ -1097,6 +1119,8 @@
.map(CacheGroupDescriptor::cacheOrGroupName)
.collect(Collectors.toList());
+ grps.add(METASTORAGE_CACHE_NAME);
+
List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
snpFut0.listen(f -> {
@@ -1112,8 +1136,7 @@
grps,
new HashSet<>(F.viewReadOnly(srvNodes,
F.node2id(),
- (node) -> CU.baselineNode(node, clusterState)))
- ));
+ (node) -> CU.baselineNode(node, clusterState)))));
String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']';
@@ -1206,7 +1229,7 @@
// Schedule task on a checkpoint and wait when it starts.
try {
- task.awaitStarted();
+ task.started().get();
}
catch (IgniteCheckedException e) {
U.error(log, "Fail to wait while cluster-wide snapshot operation started", e);
@@ -1297,6 +1320,7 @@
* @param snpName Unique snapshot name.
* @param srcNodeId Node id which cause snapshot operation.
* @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+ * @param withMetaStorage {@code true} if all metastorage data must be also included into snapshot.
* @param snpSndr Factory which produces snapshot receiver instance.
* @return Snapshot operation task which should be registered on checkpoint to run.
*/
@@ -1304,6 +1328,7 @@
String snpName,
UUID srcNodeId,
Map<Integer, Set<Integer>> parts,
+ boolean withMetaStorage,
SnapshotSender snpSndr
) {
if (!busyLock.enterBusy())
@@ -1323,6 +1348,7 @@
ioFactory,
snpSndr,
parts,
+ withMetaStorage,
locBuff));
if (prev != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 978d146..96915bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -51,6 +51,7 @@
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.store.PageWriteListener;
@@ -63,10 +64,12 @@
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -77,7 +80,6 @@
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy;
@@ -147,6 +149,9 @@
*/
private final Map<Integer, Set<Integer>> parts;
+ /** {@code true} if all metastorage data must be also included into snapshot. */
+ private final boolean withMetaStorage;
+
/** Cache group and corresponding partitions collected under the checkpoint write lock. */
private final Map<Integer, Set<Integer>> processed = new HashMap<>();
@@ -186,6 +191,7 @@
startedFut.onDone(e);
onDone(e);
parts = null;
+ withMetaStorage = false;
ioFactory = null;
locBuff = null;
}
@@ -204,14 +210,17 @@
FileIOFactory ioFactory,
SnapshotSender snpSndr,
Map<Integer, Set<Integer>> parts,
+ boolean withMetaStorage,
ThreadLocal<ByteBuffer> locBuff
) {
assert snpName != null : "Snapshot name cannot be empty or null.";
assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
assert snpSndr.executor() != null : "Executor service must be not null.";
assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
+ assert !parts.containsKey(MetaStorage.METASTORAGE_CACHE_ID) : "The withMetaStorage must be used instead.";
this.parts = parts;
+ this.withMetaStorage = withMetaStorage;
this.cctx = cctx;
this.pageStore = (FilePageStoreManager)cctx.pageStore();
this.log = cctx.logger(SnapshotFutureTask.class);
@@ -296,10 +305,10 @@
}
/**
- * @throws IgniteCheckedException If fails.
+ * @return Started future.
*/
- public void awaitStarted() throws IgniteCheckedException {
- startedFut.get();
+ public IgniteInternalFuture<?> started() {
+ return startedFut;
}
/**
@@ -339,11 +348,17 @@
throw new IgniteCheckedException("Encrypted cache groups are not allowed to be snapshot: " + grpId);
// Create cache group snapshot directory on start in a single thread.
- U.ensureDirectory(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())),
+ U.ensureDirectory(cacheWorkDir(tmpConsIdDir, FilePageStoreManager.cacheDirName(gctx.config())),
"directory for snapshotting cache group",
log);
}
+ if (withMetaStorage) {
+ U.ensureDirectory(cacheWorkDir(tmpConsIdDir, MetaStorage.METASTORAGE_DIR_NAME),
+ "directory for snapshotting metastorage",
+ log);
+ }
+
startedFut.listen(f ->
((GridCacheDatabaseSharedManager)cctx.database()).removeCheckpointListener(this)
);
@@ -366,7 +381,7 @@
}
/** {@inheritDoc} */
- @Override public void beforeCheckpointBegin(Context ctx) {
+ @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
if (stopping())
return;
@@ -376,6 +391,15 @@
else
cpEndFut.completeExceptionally(f.error());
});
+
+ if (withMetaStorage) {
+ try {
+ U.get(((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage()).flush());
+ }
+ catch (IgniteCheckedException ignore) {
+ // Flushing may be cancelled or interrupted due to the local node stopping.
+ }
+ }
}
/** {@inheritDoc} */
@@ -453,28 +477,23 @@
CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
- if (gctx == null) {
- throw new IgniteCheckedException("Cache group context has not found " +
- "due to the cache group is stopped: " + grpId);
- }
-
- for (int partId : e.getValue()) {
- GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
- PageStore store = pageStore.getStore(grpId, partId);
-
- partDeltaWriters.put(pair,
- new PageStoreSerialWriter(store,
- partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
-
- partFileLengths.put(pair, store.size());
- }
+ if (gctx == null)
+ throw new IgniteCheckedException("Cache group is stopped : " + grpId);
ccfgs.add(gctx.config());
+ addPartitionWriters(grpId, e.getValue(), FilePageStoreManager.cacheDirName(gctx.config()));
+ }
+
+ if (withMetaStorage) {
+ processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.METASTORAGE_PARTITIONS);
+
+ addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.METASTORAGE_PARTITIONS,
+ MetaStorage.METASTORAGE_DIR_NAME);
}
pageStore.readConfigurationFiles(ccfgs,
- (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), cacheDirName(ccfg), ccfgFile)));
+ (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(),
+ FilePageStoreManager.cacheDirName(ccfg), ccfgFile)));
}
catch (IgniteCheckedException e) {
acceptException(e);
@@ -525,76 +544,88 @@
for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor()));
- for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
- int grpId = e.getKey();
+ try {
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grpId = e.getKey();
+ String cacheDirName = pageStore.cacheDirName(grpId);
- CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+ // Process partitions for a particular cache group.
+ for (int partId : e.getValue()) {
+ GroupPartitionId pair = new GroupPartitionId(grpId, partId);
- if (gctx == null) {
- acceptException(new IgniteCheckedException("Cache group context has not found " +
- "due to the cache group is stopped: " + grpId));
+ Long partLen = partFileLengths.get(pair);
- break;
- }
-
- // Process partitions for a particular cache group.
- for (int partId : e.getValue()) {
- GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
- CacheConfiguration<?, ?> ccfg = gctx.config();
-
- assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair;
-
- String cacheDirName = cacheDirName(ccfg);
- Long partLen = partFileLengths.get(pair);
-
- CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
- wrapExceptionIfStarted(() -> {
- snpSndr.sendPart(
- getPartitionFile(pageStore.workDir(), cacheDirName, partId),
- cacheDirName,
- pair,
- partLen);
-
- // Stop partition writer.
- partDeltaWriters.get(pair).markPartitionProcessed();
- }),
- snpSndr.executor())
- // Wait for the completion of both futures - checkpoint end, copy partition.
- .runAfterBothAsync(cpEndFut,
+ CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
wrapExceptionIfStarted(() -> {
- File delta = partDeltaWriters.get(pair).deltaFile;
+ snpSndr.sendPart(
+ getPartitionFile(pageStore.workDir(), cacheDirName, partId),
+ cacheDirName,
+ pair,
+ partLen);
- try {
- // Atomically creates a new, empty delta file if and only if
- // a file with this name does not yet exist.
- delta.createNewFile();
- }
- catch (IOException ex) {
- throw new IgniteCheckedException(ex);
- }
-
- snpSndr.sendDelta(delta, cacheDirName, pair);
-
- boolean deleted = delta.delete();
-
- assert deleted;
+ // Stop partition writer.
+ partDeltaWriters.get(pair).markPartitionProcessed();
}),
- snpSndr.executor());
+ snpSndr.executor())
+ // Wait for the completion of both futures - checkpoint end, copy partition.
+ .runAfterBothAsync(cpEndFut,
+ wrapExceptionIfStarted(() -> {
+ File delta = partDeltaWriters.get(pair).deltaFile;
- futs.add(fut0);
+ try {
+ // Atomically creates a new, empty delta file if and only if
+ // a file with this name does not yet exist.
+ delta.createNewFile();
+ }
+ catch (IOException ex) {
+ throw new IgniteCheckedException(ex);
+ }
+
+ snpSndr.sendDelta(delta, cacheDirName, pair);
+
+ boolean deleted = delta.delete();
+
+ assert deleted;
+ }),
+ snpSndr.executor());
+
+ futs.add(fut0);
+ }
}
+
+ int futsSize = futs.size();
+
+ CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
+ .whenComplete((res, t) -> {
+ assert t == null : "Exception must never be thrown since a wrapper is used " +
+ "for each snapshot task: " + t;
+
+ closeAsync();
+ });
}
+ catch (IgniteCheckedException e) {
+ acceptException(e);
+ }
+ }
- int futsSize = futs.size();
+ /**
+ * @param grpId Cache group id.
+ * @param parts Set of partitions to be processed.
+ * @param dirName Directory name to init.
+ * @throws IgniteCheckedException If fails.
+ */
+ private void addPartitionWriters(int grpId, Set<Integer> parts, String dirName) throws IgniteCheckedException {
+ for (int partId : parts) {
+ GroupPartitionId pair = new GroupPartitionId(grpId, partId);
- CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
- .whenComplete((res, t) -> {
- assert t == null : "Exception must never be thrown since a wrapper is used " +
- "for each snapshot task: " + t;
+ PageStore store = pageStore.getStore(grpId, partId);
- closeAsync();
- });
+ partDeltaWriters.put(pair,
+ new PageStoreSerialWriter(store,
+ partDeltaFile(cacheWorkDir(tmpConsIdDir, dirName), partId)));
+
+ partFileLengths.put(pair, store.size());
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 42ef810..1c993a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -41,6 +41,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
@@ -57,6 +58,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
@@ -242,6 +244,12 @@
return null;
}
+ if (grpId == MetaStorage.METASTORAGE_CACHE_ID) {
+ checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA);
+
+ return null;
+ }
+
ByteBuffer pageBuff = buff.get();
pageBuff.clear();
pageStore.read(0, pageBuff, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 038561b..03a1cdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -70,6 +70,7 @@
import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
@@ -625,7 +626,10 @@
FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
// Collect the cache configurations and prepare a temporary directory for copying files.
- for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName())) {
+ // Metastorage can be restored only manually by directly copying files.
+ for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
+ name -> !METASTORAGE_CACHE_NAME.equals(name)))
+ {
String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 90f320c..81f6048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
@@ -74,7 +75,6 @@
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.systemview.view.MetastorageView;
-import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -176,7 +176,7 @@
);
/**
- * Map with futures used to wait for async write/remove operations completion.
+ * Map with user futures used to wait for async write/remove operations completion.
*/
private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();
@@ -441,11 +441,16 @@
private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
assert isPersistenceEnabled;
- worker.setMetaStorage(metastorage);
+ lock.writeLock().lock();
- IgniteThread workerThread = new IgniteThread(ctx.igniteInstanceName(), "dms-writer-thread", worker);
+ try {
+ worker.setMetaStorage(metastorage);
- workerThread.start();
+ worker.start();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
}
/** {@inheritDoc} */
@@ -1166,6 +1171,8 @@
return;
}
+ lock.writeLock().lock();
+
try {
if (msg instanceof DistributedMetaStorageCasMessage)
completeCas((DistributedMetaStorageCasMessage)msg);
@@ -1178,6 +1185,9 @@
catch (IgniteCheckedException | Error e) {
throw criticalError(e);
}
+ finally {
+ lock.writeLock().unlock();
+ }
}
/**
@@ -1211,6 +1221,32 @@
}
/**
+ * @return Future which will be completed when all the updates prior to the pause processed.
+ */
+ public Future<?> flush() {
+ assert isPersistenceEnabled;
+
+ return worker.flush();
+ }
+
+ /**
+ * @param compFut Future which should be completed when worker may proceed with updates.
+ */
+ public void suspend(IgniteInternalFuture<?> compFut) {
+ assert isPersistenceEnabled;
+
+ lock.readLock().lock();
+
+ try {
+ // Read lock taken, so no other distributed updated will be added to the queue.
+ worker.suspend(compFut);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
* Invoke failure handler and rethrow passed exception, possibly wrapped into the unchecked one.
*/
private RuntimeException criticalError(Throwable e) {
@@ -1231,39 +1267,40 @@
private void completeWrite(
DistributedMetaStorageHistoryItem histItem
) throws IgniteCheckedException {
- lock.writeLock().lock();
+ assert lock.writeLock().isHeldByCurrentThread();
- try {
- histItem = optimizeHistoryItem(histItem);
+ histItem = optimizeHistoryItem(histItem);
- if (histItem == null)
- return;
+ if (histItem == null)
+ return;
- ver = ver.nextVersion(histItem);
+ ver = ver.nextVersion(histItem);
- for (int i = 0, len = histItem.keys().length; i < len; i++) {
- String key = histItem.keys()[i];
- byte[] valBytes = histItem.valuesBytesArray()[i];
+ for (int i = 0, len = histItem.keys().length; i < len; i++) {
+ String key = histItem.keys()[i];
+ byte[] valBytes = histItem.valuesBytesArray()[i];
- notifyListeners(
- histItem.keys()[i],
- () -> bridge.read(key),
- () -> unmarshal(marshaller, valBytes));
- }
-
- for (int i = 0, len = histItem.keys().length; i < len; i++)
- bridge.write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
-
- addToHistoryCache(ver.id(), histItem);
+ notifyListeners(
+ histItem.keys()[i],
+ () -> bridge.read(key),
+ () -> unmarshal(marshaller, valBytes));
}
- finally {
- lock.writeLock().unlock();
- }
+
+ for (int i = 0, len = histItem.keys().length; i < len; i++)
+ bridge.write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+
+ addToHistoryCache(ver.id(), histItem);
if (isPersistenceEnabled)
worker.update(histItem);
- shrinkHistory();
+ // Shrink history so that its estimating size doesn't exceed {@link #histMaxBytes}.
+ while (histCache.sizeInBytes() > histMaxBytes && histCache.size() > 1) {
+ histCache.removeOldest();
+
+ if (isPersistenceEnabled)
+ worker.removeHistItem(ver.id() - histCache.size());
+ }
}
/**
@@ -1365,25 +1402,6 @@
}
/**
- * Shrikn history so that its estimating size doesn't exceed {@link #histMaxBytes}.
- */
- private void shrinkHistory() {
- lock.writeLock().lock();
-
- try {
- while (histCache.sizeInBytes() > histMaxBytes && histCache.size() > 1) {
- histCache.removeOldest();
-
- if (isPersistenceEnabled)
- worker.removeHistItem(ver.id() - histCache.size());
- }
- }
- finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
* Notify listeners on node start. Even if there was no data restoring.
*
* @param newData Data about which listeners should be notified.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index 2065784e..ebc5c73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -20,32 +20,42 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.COMMON_KEY_PREFIX;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.versionKey;
-import static org.apache.ignite.internal.processors.metastorage.persistence.DmsWorkerStatus.CANCEL;
-import static org.apache.ignite.internal.processors.metastorage.persistence.DmsWorkerStatus.CONTINUE;
-import static org.apache.ignite.internal.processors.metastorage.persistence.DmsWorkerStatus.HALT;
/** */
-class DmsDataWriterWorker extends GridWorker {
+public class DmsDataWriterWorker extends GridWorker {
/** */
public static final byte[] DUMMY_VALUE = {};
/** */
- private final LinkedBlockingQueue<Object> updateQueue = new LinkedBlockingQueue<>();
+ private static final Object STOP = new Object();
+
+ /** */
+ private static final Object AWAIT = new Object();
+
+ /** */
+ private final LinkedBlockingQueue<RunnableFuture<?>> updateQueue = new LinkedBlockingQueue<>();
/** */
private final DmsLocalMetaStorageLock lock;
@@ -54,22 +64,19 @@
private final Consumer<Throwable> errorHnd;
/** */
- @TestOnly
- public DmsWorkerStatus status() {
- return status;
- }
-
- /** */
- private volatile DmsWorkerStatus status = CONTINUE;
-
- /** */
private DistributedMetaStorageVersion workerDmsVer;
/** */
private volatile ReadWriteMetastorage metastorage;
/** */
- private volatile boolean firstStart = true;
+ private volatile CountDownLatch latch = new CountDownLatch(0);
+
+ /**
+ * This task is used to pause processing of the {@code updateQueue}. If this task completed it means that all the updates
+ * prior to it already flushed to the local metastorage.
+ */
+ private volatile Future<?> suspendFut = CompletableFuture.completedFuture(AWAIT);
/** */
public DmsDataWriterWorker(
@@ -81,6 +88,9 @@
super(igniteInstanceName, "dms-writer", log);
this.lock = lock;
this.errorHnd = errorHnd;
+
+ // Put restore task to the queue, so it will be executed on worker start.
+ updateQueue.offer(newDmsTask(this::restore));
}
/** */
@@ -88,9 +98,47 @@
this.metastorage = metastorage;
}
+ /** Start new distributed metastorage worker thread. */
+ public void start() {
+ isCancelled = false;
+
+ new IgniteThread(igniteInstanceName(), "dms-writer-thread", this).start();
+ }
+
+ /**
+ * @return Future which will be completed when all tasks prior to the pause task are finished.
+ */
+ public Future<?> flush() {
+ return suspendFut;
+ }
+
+ /**
+ * @param compFut Future which should be completed when worker may proceed with updates.
+ */
+ public void suspend(IgniteInternalFuture<?> compFut) {
+ if (isCancelled())
+ suspendFut = CompletableFuture.completedFuture(AWAIT);
+ else {
+ latch = new CountDownLatch(1);
+
+ updateQueue.offer((RunnableFuture<?>)(suspendFut = new FutureTask<>(() -> AWAIT)));
+
+ compFut.listen(f -> latch.countDown());
+ }
+ }
+
/** */
public void update(DistributedMetaStorageHistoryItem histItem) {
- updateQueue.offer(histItem);
+ updateQueue.offer(newDmsTask(() -> {
+ metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
+
+ workerDmsVer = workerDmsVer.nextVersion(histItem);
+
+ metastorage.write(versionKey(), workerDmsVer);
+
+ for (int i = 0, len = histItem.keys().length; i < len; i++)
+ write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+ }));
}
/** */
@@ -98,22 +146,48 @@
assert fullNodeData.fullData != null;
assert fullNodeData.hist != null;
- updateQueue.clear();
+ updateQueue.offer(newDmsTask(() -> {
+ metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
- updateQueue.offer(fullNodeData);
+ doCleanup();
+
+ for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+ metastorage.writeRaw(localKey(item.key), item.valBytes);
+
+ for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+ DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+
+ long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+
+ metastorage.write(historyItemKey(histItemVer), histItem);
+ }
+
+ metastorage.write(versionKey(), fullNodeData.ver);
+
+ workerDmsVer = fullNodeData.ver;
+
+ metastorage.remove(cleanupGuardKey());
+ }));
}
/** */
public void removeHistItem(long ver) {
- updateQueue.offer(ver);
+ updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
}
/** */
public void cancel(boolean halt) throws InterruptedException {
- if (halt)
+ if (halt) {
updateQueue.clear();
- updateQueue.offer(status = halt ? HALT : CANCEL);
+ if (suspendFut instanceof RunnableFuture)
+ ((Runnable)suspendFut).run();
+ }
+
+ updateQueue.offer(new FutureTask<>(() -> STOP));
+ latch.countDown();
+
+ isCancelled = true;
Thread runner = runner();
@@ -122,93 +196,30 @@
}
/** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- status = CONTINUE;
+ @Override protected void body() {
+ while (true) {
+ try {
+ RunnableFuture<?> curTask = updateQueue.take();
- try {
- if (firstStart) {
- firstStart = false;
+ curTask.run();
- lock.lock();
+ // Result will be null for any runnable executed tasks over metastorage and non-null for system DMS tasks.
+ Object res = U.get(curTask);
- try {
- restore();
- }
- finally {
- lock.unlock();
- }
+ if (res == STOP)
+ break;
+
+ if (res == AWAIT)
+ latch.await();
}
+ catch (InterruptedException ignore) {
+ }
+ catch (Throwable t) {
+ errorHnd.accept(t);
- while (true) {
- Object update = updateQueue.peek();
-
- try {
- update = updateQueue.take();
- }
- catch (InterruptedException ignore) {
- }
-
- lock.lock();
-
- try {
- // process update
- if (update instanceof DistributedMetaStorageHistoryItem)
- applyUpdate((DistributedMetaStorageHistoryItem)update);
- else if (update instanceof DistributedMetaStorageClusterNodeData) {
- DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
-
- metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
-
- doCleanup();
-
- for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
- metastorage.writeRaw(localKey(item.key), item.valBytes);
-
- for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
- DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
-
- long histItemVer = fullNodeData.ver.id() + i - (len - 1);
-
- metastorage.write(historyItemKey(histItemVer), histItem);
- }
-
- metastorage.write(versionKey(), fullNodeData.ver);
-
- workerDmsVer = fullNodeData.ver;
-
- metastorage.remove(cleanupGuardKey());
- }
- else if (update instanceof Long) {
- long ver = (Long)update;
-
- metastorage.remove(historyItemKey(ver));
- }
- else {
- assert update instanceof DmsWorkerStatus : update;
-
- break;
- }
- }
- finally {
- lock.unlock();
- }
+ break;
}
}
- catch (Throwable t) {
- errorHnd.accept(t);
- }
- }
-
- /** */
- private void applyUpdate(DistributedMetaStorageHistoryItem histItem) throws IgniteCheckedException {
- metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
-
- workerDmsVer = workerDmsVer.nextVersion(histItem);
-
- metastorage.write(versionKey(), workerDmsVer);
-
- for (int i = 0, len = histItem.keys().length; i < len; i++)
- write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
}
/** */
@@ -285,4 +296,24 @@
else
metastorage.writeRaw(localKey(key), valBytes);
}
+
+ /**
+ * @param task Task to execute on local metastorage.
+ * @return Future will be completed when task has been finished.
+ */
+ private RunnableFuture<Void> newDmsTask(IgniteThrowableRunner task) {
+ return new FutureTask<>(() -> {
+ lock.lock();
+
+ try {
+ task.run();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ lock.unlock();
+ }
+ }, null);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsWorkerStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsWorkerStatus.java
deleted file mode 100644
index bb3be51..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsWorkerStatus.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.metastorage.persistence;
-
-/** */
-enum DmsWorkerStatus {
- /** */
- CONTINUE,
-
- /** */
- CANCEL,
-
- /** */
- HALT;
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index 691c35f3..e675f63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -44,7 +44,6 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
@@ -246,7 +245,7 @@
MetaStorage metaStorage = ignite.context().cache().context().database().metaStorage();
corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID,
- PageIdAllocator.METASTORE_PARTITION);
+ MetaStorage.METASTORE_PARTITION);
stopGrid(0);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
index e70651a..5f65cf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
@@ -51,10 +51,10 @@
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.META_STORAGE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_DIR_NAME;
/**
*
@@ -336,7 +336,7 @@
boolean metaStore = METASTORAGE_CACHE_NAME.equals(cacheName);
boolean txLog = TX_LOG_CACHE_NAME.equals(cacheName);
- File cacheWorkDir = metaStore ? new File(pageStoreMgr.workDir(), META_STORAGE_NAME) :
+ File cacheWorkDir = metaStore ? new File(pageStoreMgr.workDir(), METASTORAGE_DIR_NAME) :
txLog ? new File(pageStoreMgr.workDir(), TX_LOG_CACHE_NAME) :
pageStoreMgr.cacheWorkDir(node.cachex(cacheName).configuration());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
index 829461b..fe9f8f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
@@ -53,8 +53,8 @@
import static java.nio.file.Files.walkFileTree;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage.CP_FILE_NAME_PATTERN;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.META_STORAGE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_DIR_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_TEMP_NAME_PATTERN;
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
@@ -224,7 +224,7 @@
String parentDirName = path.toFile().getParentFile().getName();
- if (parentDirName.equals(META_STORAGE_NAME) || parentDirName.equals(TxLog.TX_LOG_CACHE_NAME))
+ if (parentDirName.equals(METASTORAGE_DIR_NAME) || parentDirName.equals(TxLog.TX_LOG_CACHE_NAME))
return CONTINUE;
if (WAL_NAME_PATTERN.matcher(name).matches() || WAL_TEMP_NAME_PATTERN.matcher(name).matches())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index b873f7d..c3827b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -33,6 +33,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@@ -332,9 +333,25 @@
String snpName,
boolean activate
) throws Exception {
+ return startGridsFromSnapshot(IntStream.range(0, cnt).boxed().collect(Collectors.toSet()), path, snpName, activate);
+ }
+
+ /**
+ * @param ids Set of ignite instances ids to start.
+ * @param path Snapshot path resolver.
+ * @param snpName Snapshot to start grids from.
+ * @param activate {@code true} to activate after cluster start.
+ * @return Coordinator ignite instance.
+ * @throws Exception If fails.
+ */
+ protected IgniteEx startGridsFromSnapshot(Set<Integer> ids,
+ Function<IgniteConfiguration, String> path,
+ String snpName,
+ boolean activate
+ ) throws Exception {
IgniteEx crd = null;
- for (int i = 0; i < cnt; i++) {
+ for (Integer i : ids) {
IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
cfg.setWorkDirectory(Paths.get(path.apply(cfg), snpName).toString());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index 1fa2149..ac5566e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -58,6 +58,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
@@ -888,13 +889,13 @@
awaitPartitionMapExchange();
- for (Ignite grid : Arrays.asList(grid(1), grid(2))) {
- ((IgniteEx)grid).context().cache().context().exchange()
+ for (IgniteEx grid : Arrays.asList(grid(1), grid(2))) {
+ grid.context().cache().context().exchange()
.registerExchangeAwareComponent(new PartitionsExchangeAware() {
/** {@inheritDoc} */
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
try {
- block.await();
+ block.await(SNAPSHOT_AWAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
fail("Must not catch exception here: " + e.getMessage());
@@ -907,7 +908,7 @@
TestRecordingCommunicationSpi.spi(grid)
.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionsSingleMessage)
- return ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null;
+ return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null;
return false;
});
@@ -981,6 +982,9 @@
.registerExchangeAwareComponent(new PartitionsExchangeAware() {
/** {@inheritDoc} */
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ if (!(fut.firstEvent() instanceof DiscoveryCustomEvent))
+ return;
+
try {
exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced()));
latch.countDown();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index d0a943d..2260ada 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -196,6 +196,7 @@
SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
cctx.localNodeId(),
F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ false,
new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
try {
@@ -243,7 +244,7 @@
// right after current will be completed.
cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, SNAPSHOT_NAME));
- snpFutTask.awaitStarted();
+ snpFutTask.started().get();
db.forceCheckpoint("snapshot is ready to be created")
.futureFor(CheckpointState.MARKER_STORED_TO_DISK)
@@ -555,7 +556,7 @@
Map<Integer, Set<Integer>> parts,
SnapshotSender snpSndr
) throws IgniteCheckedException {
- SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, snpSndr);
+ SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, false, snpSndr);
snpFutTask.start();
@@ -564,7 +565,7 @@
// right after current will be completed.
cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
- snpFutTask.awaitStarted();
+ snpFutTask.started().get();
return snpFutTask;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
new file mode 100644
index 0000000..a376148
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+ /** */
+ private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithMetastorage() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+ startClientGrid();
+
+ ignite.context().distributedMetastorage().write("key", "value");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+ assertEquals("value", snp.context().distributedMetastorage().read("key"));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testMetastorageUpdateDuringSnapshot() throws Exception {
+ AtomicInteger keyCnt = new AtomicInteger();
+ AtomicBoolean stop = new AtomicBoolean();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+ try {
+ ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, 3, "dms-updater");
+
+ DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+ DistributedMetaStorageImpl.class, "worker");
+ LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+ "updateQueue");
+
+ RunnableFuture<?> testTask = new FutureTask<>(() -> {
+ U.await(latch);
+
+ return null;
+ });
+
+ queue.offer(testTask);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+ ignite.context().cache().context().exchange()
+ .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+ /** {@inheritDoc} */
+ @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ latch.countDown();
+ }
+ });
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ stop.set(true);
+ updFut.get();
+
+ stopAllGrids();
+
+ Function<IgniteConfiguration, String> pathProv = cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath();
+ Set<String> keySet0 = new TreeSet<>();
+ Set<String> keySet1 = new TreeSet<>();
+
+ IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0), pathProv, SNAPSHOT_NAME, false);
+ snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet0.add(key));
+
+ stopGrid(0);
+
+ IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1), pathProv, SNAPSHOT_NAME, false);
+ snp1.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet1.add(key));
+
+ assertEquals("Keys must be the same on all nodes", keySet0, keySet1);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+ AtomicInteger keyCnt = new AtomicInteger();
+ AtomicBoolean stop = new AtomicBoolean();
+ Set<String> writtenKeys = new TreeSet<>();
+
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+ try {
+ String key = SNAPSHOT_PREFIX + keyCnt.getAndIncrement();
+
+ ignite.context().distributedMetastorage().write(key, "value");
+
+ writtenKeys.add(key);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, 3, "dms-updater");
+
+ ((GridCacheDatabaseSharedManager)ignite.context().cache().context().database())
+ .addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ if (ctx.progress().reason().contains(SNAPSHOT_NAME)) {
+ Map<String, SnapshotFutureTask> locMap =
+ GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class, "locSnpTasks");
+
+ // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
+ locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) {
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ }
+ });
+
+ IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ GridTestUtils.assertThrowsAnyCause(log, fut::get, IgniteCheckedException.class, "Test exception");
+
+ stop.set(true);
+
+ // Load future must complete without exceptions, all metastorage keys must be written.
+ updFut.get();
+
+ Set<String> readedKeys = new TreeSet<>();
+
+ ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> readedKeys.add(key));
+
+ assertEquals("Not all metastorage keys have been written", writtenKeys, readedKeys);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index 400ef14..cabb7f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -483,7 +483,7 @@
/**
* @throws Exception If failed.
*/
- @Test @SuppressWarnings("ThrowableNotThrown")
+ @Test
public void testConflictingData() throws Exception {
IgniteEx igniteEx = startGrid(0);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java
index 8a4cccf..4e73d6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java
@@ -18,12 +18,14 @@
package org.apache.ignite.internal.processors.metastorage.persistence;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
import org.junit.After;
import org.junit.Assert;
@@ -37,15 +39,9 @@
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.versionKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION;
import static org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker.DUMMY_VALUE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
/** */
-public class DmsDataWriterWorkerTest {
- /** */
- private static IgniteLogger log = new GridTestLog4jLogger(true).getLogger(DmsDataWriterWorkerTest.class);
-
+public class DmsDataWriterWorkerTest extends GridCommonAbstractTest {
/** */
private Thread testThread;
@@ -59,6 +55,9 @@
private DmsDataWriterWorker worker;
/** */
+ private final AtomicReference<Throwable> errHnd = new AtomicReference<>();
+
+ /** */
@Before
public void before() {
testThread = Thread.currentThread();
@@ -71,7 +70,7 @@
DmsDataWriterWorkerTest.class.getSimpleName(),
log,
lock,
- throwable -> {}
+ errHnd::set
);
worker.setMetaStorage(metastorage);
@@ -103,7 +102,7 @@
startWorker();
- worker.cancel(true);
+ worker.cancel(false);
assertEquals(1, metastorage.cache.size());
assertEquals(INITIAL_VERSION, metastorage.read(versionKey()));
@@ -290,24 +289,22 @@
@Test
public void testHalt() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- AtomicBoolean await = new AtomicBoolean(true);
+
+ LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+ "updateQueue");
metastorage = new MyReadWriteMetaStorageMock() {
@Override public void writeRaw(String key, byte[] data) {
try {
- if (await.get())
- latch.countDown();
+ assertTrue(GridTestUtils.waitForCondition(() -> queue.size() == 3, getTestTimeout()));
- while (worker.status() != DmsWorkerStatus.HALT) {
- //noinspection BusyWait
- Thread.sleep(0);
- }
+ latch.countDown();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> queue.size() == 1, getTestTimeout()));
}
catch (Exception ignore) {
}
- await.set(false);
-
super.writeRaw(key, data);
}
};
@@ -318,11 +315,15 @@
worker.update(histItem("key1", "val1"));
worker.update(histItem("key2", "val2"));
+ worker.update(histItem("key3", "val3"));
+ worker.update(histItem("key4", "val4"));
latch.await();
worker.cancel(true);
+ assertNull(errHnd.get());
+
// ver, val, hist.
assertEquals(3, metastorage.cache.size());
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index fbbe62e..925f3f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -45,6 +45,7 @@
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
import org.apache.ignite.internal.processors.performancestatistics.CacheStartTest;
import org.apache.ignite.internal.processors.performancestatistics.CheckpointTest;
import org.apache.ignite.internal.processors.performancestatistics.ForwardReadTest;
@@ -99,6 +100,7 @@
IgniteSnapshotManagerSelfTest.class,
IgniteClusterSnapshotSelfTest.class,
IgniteClusterSnapshotCheckTest.class,
+ IgniteSnapshotWithMetastorageTest.class,
IgniteSnapshotMXBeanTest.class,
IgniteClusterSnapshotRestoreSelfTest.class,