blob: 03c7af651d92af6852e28b201645fcefb029d203 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.FullMessage;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_FAILED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_FINISHED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_STARTED;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest.checkPartitionMapExchangeFinished;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
/**
* Cluster-wide snapshot test.
*/
public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
/** Time to wait while rebalance may happen. */
private static final long REBALANCE_AWAIT_TIME = GridTestUtils.SF.applyLB(10_000, 3_000);
/** Cache configuration for test. */
private static final CacheConfiguration<Integer, Integer> atomicCcfg = new CacheConfiguration<Integer, Integer>("atomicCacheName")
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setBackups(2)
.setAffinity(new RendezvousAffinityFunction(false, CACHE_PARTS_COUNT));
/** {@code true} if node should be started in separate jvm. */
protected volatile boolean jvm;
/** @throws Exception If fails. */
@Before
@Override public void beforeTestSnapshot() throws Exception {
super.beforeTestSnapshot();
jvm = false;
}
/**
* Take snapshot from the whole cluster and check snapshot consistency when the
* cluster tx load starts on a new topology version.
* Note: Client nodes and server nodes not in baseline topology must not be affected.
*
* @throws Exception If fails.
*/
@Test
public void testConsistentClusterSnapshotLoadNewTopology() throws Exception {
int grids = 3;
String snpName = "backup23012020";
AtomicInteger atKey = new AtomicInteger(CACHE_KEYS_RANGE);
AtomicInteger txKey = new AtomicInteger(CACHE_KEYS_RANGE);
IgniteEx ignite = startGrids(grids);
startClientGrid();
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().state(ACTIVE);
ignite.events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT);
// Start node not in baseline.
IgniteEx notBltIgnite = startGrid(grids);
File locSnpDir = snp(notBltIgnite).snapshotLocalDir(SNAPSHOT_NAME);
String notBltDirName = folderName(notBltIgnite);
IgniteCache<Integer, Integer> atCache = ignite.createCache(atomicCcfg);
for (int idx = 0; idx < CACHE_KEYS_RANGE; idx++) {
atCache.put(atKey.incrementAndGet(), -1);
ignite.cache(DEFAULT_CACHE_NAME).put(txKey.incrementAndGet(), -1);
}
forceCheckpoint();
CountDownLatch loadLatch = new CountDownLatch(1);
ignite.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
/** {@inheritDoc} */
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
return;
// First discovery custom event will be a snapshot operation.
assertTrue(isSnapshotOperation(fut.firstEvent()));
assertTrue("Snapshot must use pme-free exchange", fut.context().exchangeFreeSwitch());
}
/** {@inheritDoc} */
@Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
return;
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)fut.firstEvent()).customMessage();
assertNotNull(msg);
if (msg instanceof SnapshotDiscoveryMessage)
loadLatch.countDown();
}
});
// Start cache load.
IgniteInternalFuture<Long> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
try {
U.await(loadLatch);
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int atIdx = rnd.nextInt(grids);
// Zero out the sign bit.
grid(atIdx).cache(atomicCcfg.getName()).put(txKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
int txIdx = rnd.nextInt(grids);
grid(txIdx).cache(DEFAULT_CACHE_NAME).put(atKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
}
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
}, 3, "cache-put-");
try {
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(snpName);
U.await(loadLatch, 10, TimeUnit.SECONDS);
fut.get();
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
}
finally {
loadFut.cancel();
}
// Cluster can be deactivated but we must test snapshot restore when binary recovery also occurred.
stopAllGrids();
assertTrue("Snapshot directory must be empty for node not in baseline topology: " + notBltDirName,
!searchDirectoryRecursively(locSnpDir.toPath(), notBltDirName).isPresent());
IgniteEx snpIg0 = startGridsFromSnapshot(grids, snpName);
assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + DEFAULT_CACHE_NAME,
CACHE_KEYS_RANGE, snpIg0.cache(DEFAULT_CACHE_NAME).size());
assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + atomicCcfg.getName(),
CACHE_KEYS_RANGE, snpIg0.cache(atomicCcfg.getName()).size());
snpIg0.cache(DEFAULT_CACHE_NAME).query(new ScanQuery<>(null))
.forEach(e -> assertTrue("Snapshot must contains only negative values " +
"[cache=" + DEFAULT_CACHE_NAME + ", entry=" + e + ']', (Integer)e.getValue() < 0));
snpIg0.cache(atomicCcfg.getName()).query(new ScanQuery<>(null))
.forEach(e -> assertTrue("Snapshot must contains only negative values " +
"[cache=" + atomicCcfg.getName() + ", entry=" + e + ']', (Integer)e.getValue() < 0));
}
/** @throws Exception If fails. */
@Test
public void testSnapshotPrimaryBackupsTheSame() throws Exception {
int grids = 3;
AtomicInteger cacheKey = new AtomicInteger();
IgniteEx ignite = startGridsWithCache(grids, dfltCacheCfg, CACHE_KEYS_RANGE);
IgniteInternalFuture<Long> atLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int gId = rnd.nextInt(grids);
IgniteCache<Integer, Integer> txCache = grid(gId).getOrCreateCache(dfltCacheCfg.getName());
try (Transaction tx = grid(gId).transactions().txStart()) {
txCache.put(cacheKey.incrementAndGet(), 0);
txCache.put(cacheKey.incrementAndGet(), 1);
tx.commit();
}
}
}, 5, "tx-cache-put-");
IgniteInternalFuture<Long> txLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
IgniteCache<Integer, Integer> atomicCache = grid(rnd.nextInt(grids))
.getOrCreateCache(atomicCcfg);
atomicCache.put(cacheKey.incrementAndGet(), 0);
}
}, 5, "atomic-cache-put-");
try {
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
fut.get();
}
finally {
txLoadFut.cancel();
atLoadFut.cancel();
}
stopAllGrids();
IgniteEx snpIg0 = startGridsFromSnapshot(grids, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
// Block whole rebalancing.
for (Ignite g : G.allGrids())
TestRecordingCommunicationSpi.spi(g).blockMessages((node, msg) -> msg instanceof GridDhtPartitionDemandMessage);
snpIg0.cluster().state(ACTIVE);
assertFalse("Primary and backup in snapshot must have the same counters. Rebalance must not happen.",
GridTestUtils.waitForCondition(() -> {
boolean hasMsgs = false;
for (Ignite g : G.allGrids())
hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
return hasMsgs;
}, REBALANCE_AWAIT_TIME));
TestRecordingCommunicationSpi.stopBlockAll();
assertPartitionsSame(idleVerify(snpIg0, dfltCacheCfg.getName(), atomicCcfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotConsistencyUnderLoad() throws Exception {
int clientsCnt = 50;
int balance = 10_000;
int transferLimit = 1000;
int total = clientsCnt * balance * 2;
int grids = 3;
int transferThreadCnt = 4;
AtomicBoolean stop = new AtomicBoolean(false);
CountDownLatch txStarted = new CountDownLatch(1);
CacheConfiguration<Integer, Account> eastCcfg = txCacheConfig(new CacheConfiguration<>("east"));
CacheConfiguration<Integer, Account> westCcfg = txCacheConfig(new CacheConfiguration<>("west"));
startGridsWithCache(grids, clientsCnt, key -> new Account(key, balance), eastCcfg, westCcfg);
Ignite client = startClientGrid(grids);
assertEquals("The initial summary value in all caches is not correct.",
total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
forceCheckpoint();
IgniteInternalFuture<?> txLoadFut = GridTestUtils.runMultiThreadedAsync(
() -> {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int amount;
try {
while (!stop.get()) {
IgniteEx ignite = grid(rnd.nextInt(grids));
IgniteCache<Integer, Account> east = ignite.cache("east");
IgniteCache<Integer, Account> west = ignite.cache("west");
amount = rnd.nextInt(transferLimit);
txStarted.countDown();
try (Transaction tx = ignite.transactions().txStart()) {
Integer id = rnd.nextInt(clientsCnt);
Account acc0 = east.get(id);
Account acc1 = west.get(id);
acc0.balance -= amount;
acc1.balance += amount;
east.put(id, acc0);
west.put(id, acc1);
tx.commit();
}
}
}
catch (Throwable e) {
U.error(log, e);
fail("Tx must not be failed.");
}
}, transferThreadCnt, "transfer-account-thread-");
try {
U.await(txStarted);
grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
}
finally {
stop.set(true);
}
txLoadFut.get();
assertEquals("The summary value should not changed during tx transfers.",
total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
stopAllGrids();
IgniteEx snpIg0 = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
assertEquals("The total amount of all cache values must not changed in snapshot.",
total, sumAllCacheValues(snpIg0, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotWithCacheNodeFilter() throws Exception {
int grids = 4;
CacheConfiguration<Integer, Integer> ccfg = txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
.setNodeFilter(node -> node.consistentId().toString().endsWith("1"));
IgniteEx ig0 = startGridsWithoutCache(grids);
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
ig0.getOrCreateCache(ccfg).put(i, i);
ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(grids,
cfg -> resolveSnapshotWorkDirectory(cfg.setCacheConfiguration()).getAbsolutePath(),
SNAPSHOT_NAME,
true);
awaitPartitionMapExchange();
checkCacheDiscoveryDataConsistent();
CacheGroupDescriptor descr = snp.context().cache().cacheGroupDescriptors()
.get(CU.cacheId(ccfg.getName()));
assertNotNull(descr);
assertNotNull(descr.config().getNodeFilter());
assertEquals(ccfg.getNodeFilter().apply(grid(1).localNode()),
descr.config().getNodeFilter().apply(grid(1).localNode()));
assertSnapshotCacheKeys(snp.cache(ccfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testRejectCacheStopDuringClusterSnapshot() throws Exception {
// Block the full message, so cluster-wide snapshot operation would not be fully completed.
IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
spi.block((msg) -> {
if (msg instanceof FullMessage) {
FullMessage<?> msg0 = (FullMessage<?>)msg;
assertEquals("Snapshot distributed process must be used",
DistributedProcess.DistributedProcessType.START_SNAPSHOT.ordinal(), msg0.type());
assertTrue("Snapshot has to be finished successfully on all nodes", msg0.error().isEmpty());
return true;
}
return false;
});
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
spi.waitBlocked(10_000L);
// Creating of new caches should not be blocked.
ignite.getOrCreateCache(dfltCacheCfg.setName("default2"))
.put(1, 1);
forceCheckpoint();
assertThrowsAnyCause(log,
() -> {
ignite.destroyCache(DEFAULT_CACHE_NAME);
return 0;
},
IgniteCheckedException.class,
SNP_IN_PROGRESS_ERR_MSG);
spi.unblock();
fut.get();
}
/** @throws Exception If fails. */
@Test
public void testBltChangeDuringClusterSnapshot() throws Exception {
IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
startGrid(3);
long topVer = ignite.cluster().topologyVersion();
BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
spi.block((msg) -> msg instanceof FullMessage);
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
spi.waitBlocked(10_000L);
// Not baseline node joins successfully.
String grid4Dir = folderName(startGrid(4));
// Not blt node left the cluster and snapshot not affected.
stopGrid(4);
// Client node must connect successfully.
startClientGrid(4);
// Changing baseline complete successfully.
ignite.cluster().setBaselineTopology(topVer);
spi.unblock();
fut.get();
assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + grid4Dir,
!searchDirectoryRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), grid4Dir).isPresent());
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
spi.block((msg) -> msg instanceof FullMessage);
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
spi.waitBlocked(10_000L);
ignite.close();
assertThrowsAnyCause(log,
fut::get,
NodeStoppingException.class,
SNP_NODE_STOPPING_ERR_MSG);
}
/** @throws Exception If fails. */
@Test
public void testSnapshotExistsException() throws Exception {
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
assertThrowsAnyCause(log,
() -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
IgniteException.class,
"Snapshot with given name already exists on local node.");
stopAllGrids();
// Check that snapshot has not been accidentally deleted.
IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCleanedOnLeft() throws Exception {
CountDownLatch block = new CountDownLatch(1);
CountDownLatch partProcessed = new CountDownLatch(1);
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
String dirNameIgnite0 = folderName(ignite);
String dirNameIgnite1 = folderName(grid(1));
snp(grid(1)).localSnapshotSenderFactory(
blockingLocalSnapshotSender(grid(1), partProcessed, block));
TestRecordingCommunicationSpi commSpi1 = TestRecordingCommunicationSpi.spi(grid(1));
commSpi1.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
U.await(partProcessed);
stopGrid(1);
block.countDown();
assertThrowsAnyCause(log,
fut::get,
IgniteCheckedException.class,
"Execution of local snapshot tasks fails");
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED));
assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + dirNameIgnite0,
!searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite0).isPresent());
startGrid(1);
awaitPartitionMapExchange();
// Snapshot directory must be cleaned.
assertTrue("Snapshot directory must be empty for node 1 due to snapshot future fail: " + dirNameIgnite1,
!searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite1).isPresent());
List<String> allSnapshots = snp(ignite).localSnapshotNames();
assertTrue("Snapshot directory must be empty due to snapshot fail: " + allSnapshots,
allSnapshots.isEmpty());
}
/** @throws Exception If fails. */
@Test
public void testRecoveryClusterSnapshotJvmHalted() throws Exception {
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
String grid0Dir = folderName(ignite);
String grid1Dir = folderName(grid(1));
File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
jvm = true;
IgniteConfiguration cfg2 = optimize(getConfiguration(getTestIgniteInstanceName(2)));
cfg2.getDataStorageConfiguration()
.setFileIOFactory(new HaltJvmFileIOFactory(new RandomAccessFileIOFactory(),
(Predicate<File> & Serializable) file -> {
// Trying to create FileIO over partition file.
return file.getAbsolutePath().contains(SNAPSHOT_NAME);
}));
startGrid(cfg2);
String grid2Dir = U.maskForFileName(cfg2.getConsistentId().toString());
jvm = false;
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
awaitPartitionMapExchange();
assertThrowsAnyCause(log,
() -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
IgniteCheckedException.class,
"Execution of local snapshot tasks fails");
assertTrue("Snapshot directory must be empty: " + grid0Dir,
!searchDirectoryRecursively(locSnpDir.toPath(), grid0Dir).isPresent());
assertTrue("Snapshot directory must be empty: " + grid1Dir,
!searchDirectoryRecursively(locSnpDir.toPath(), grid1Dir).isPresent());
assertTrue("Snapshot directory must exist due to grid2 has been halted and cleanup not fully performed: " + grid2Dir,
searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
IgniteEx grid2 = startGrid(2);
assertTrue("Snapshot directory must be empty after recovery: " + grid2Dir,
!searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
awaitPartitionMapExchange();
assertTrue("Snapshot directory must be empty", grid2.context().cache().context().snapshotMgr().localSnapshotNames().isEmpty());
ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
.get();
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotWithRebalancing() throws Exception {
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
TestRecordingCommunicationSpi commSpi = TestRecordingCommunicationSpi.spi(ignite);
commSpi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
startGrid(2);
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
commSpi.waitForBlocked();
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
commSpi.stopBlock(true);
fut.get();
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
awaitPartitionMapExchange();
checkPartitionMapExchangeFinished();
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotWithExplicitPath() throws Exception {
File exSnpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
try {
IgniteEx ignite = null;
for (int i = 0; i < 2; i++) {
IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
cfg.setSnapshotPath(exSnpDir.getAbsolutePath());
ignite = startGrid(cfg);
}
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().state(ACTIVE);
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
.get();
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(2, cfg -> exSnpDir.getAbsolutePath(), SNAPSHOT_NAME, true);
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
}
finally {
stopAllGrids();
U.delete(exSnpDir);
}
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotMetrics() throws Exception {
String newSnapshotName = SNAPSHOT_NAME + "_new";
CountDownLatch deltaApply = new CountDownLatch(1);
CountDownLatch deltaBlock = new CountDownLatch(1);
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
MetricRegistry mreg0 = ignite.context().metric().registry(SNAPSHOT_METRICS);
LongMetric startTime = mreg0.findMetric("LastSnapshotStartTime");
LongMetric endTime = mreg0.findMetric("LastSnapshotEndTime");
ObjectGauge<String> snpName = mreg0.findMetric("LastSnapshotName");
ObjectGauge<String> errMsg = mreg0.findMetric("LastSnapshotErrorMessage");
ObjectGauge<List<String>> snpList = mreg0.findMetric("LocalSnapshotNames");
// Snapshot process will be blocked when delta partition files processing starts.
snp(ignite).localSnapshotSenderFactory(
blockingLocalSnapshotSender(ignite, deltaApply, deltaBlock));
assertEquals("Snapshot start time must be undefined prior to snapshot operation started.",
0, startTime.value());
assertEquals("Snapshot end time must be undefined to snapshot operation started.",
0, endTime.value());
assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty());
assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty());
assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty());
long cutoffStartTime = U.currentTimeMillis();
IgniteFuture<Void> fut0 = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
U.await(deltaApply);
assertTrue("Snapshot start time must be set prior to snapshot operation started " +
"[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']',
startTime.value() >= cutoffStartTime);
assertEquals("Snapshot end time must be zero prior to snapshot operation started.",
0, endTime.value());
assertEquals("Snapshot name must be set prior to snapshot operation started.",
SNAPSHOT_NAME, snpName.value());
assertTrue("Snapshot error message must null prior to snapshot operation started.",
errMsg.value().isEmpty());
IgniteFuture<Void> fut1 = grid(1).snapshot().createSnapshot(newSnapshotName);
assertThrowsWithCause((Callable<Object>)fut1::get, IgniteException.class);
MetricRegistry mreg1 = grid(1).context().metric().registry(SNAPSHOT_METRICS);
LongMetric startTime1 = mreg1.findMetric("LastSnapshotStartTime");
LongMetric endTime1 = mreg1.findMetric("LastSnapshotEndTime");
ObjectGauge<String> snpName1 = mreg1.findMetric("LastSnapshotName");
ObjectGauge<String> errMsg1 = mreg1.findMetric("LastSnapshotErrorMessage");
assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
startTime1.value() > 0);
assertEquals("Snapshot end time must zero for failed on start snapshots.",
0, endTime1.value());
assertEquals("Snapshot name must be set when snapshot operation already finished.",
newSnapshotName, snpName1.value());
assertNotNull("Concurrent snapshot operation must failed.",
errMsg1.value());
deltaBlock.countDown();
fut0.get();
assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
startTime.value() > 0);
assertTrue("Snapshot end time must be greater than zero for finished snapshot.",
endTime.value() > 0);
assertEquals("Snapshot name must be set when snapshot operation already finished.",
SNAPSHOT_NAME, snpName.value());
assertTrue("Concurrent snapshot operation must finished successfully.",
errMsg.value().isEmpty());
assertEquals("Only the first snapshot must be created and stored on disk.",
Collections.singletonList(SNAPSHOT_NAME), snpList.value());
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotIncorrectNameFails() throws Exception {
IgniteEx ignite = startGridsWithCache(1, dfltCacheCfg, CACHE_KEYS_RANGE);
assertThrowsAnyCause(log,
() -> ignite.snapshot().createSnapshot("--â„–=+.:(snapshot)").get(),
IllegalArgumentException.class,
"Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotWithOfflineBlt() throws Exception {
IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
stopGrid(2);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
.get();
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
awaitPartitionMapExchange();
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
assertPartitionsSame(idleVerify(snp, dfltCacheCfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotWithSharedCacheGroup() throws Exception {
CacheConfiguration<Integer, Integer> ccfg1 = txCacheConfig(new CacheConfiguration<>("tx1"));
CacheConfiguration<Integer, Integer> ccfg2 = txCacheConfig(new CacheConfiguration<>("tx2"));
ccfg1.setGroupName("group");
ccfg2.setGroupName("group");
IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, Integer::new, ccfg1, ccfg2);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
awaitPartitionMapExchange();
assertSnapshotCacheKeys(snp.cache(ccfg1.getName()));
assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCoordinatorStopped() throws Exception {
CountDownLatch block = new CountDownLatch(1);
startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
startClientGrid(3);
awaitPartitionMapExchange();
for (IgniteEx grid : Arrays.asList(grid(1), grid(2))) {
grid.context().cache().context().exchange()
.registerExchangeAwareComponent(new PartitionsExchangeAware() {
/** {@inheritDoc} */
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
try {
block.await(TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
fail("Must not catch exception here: " + e.getMessage());
}
}
});
}
for (Ignite grid : G.allGrids()) {
TestRecordingCommunicationSpi.spi(grid)
.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionsSingleMessage)
return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null;
return false;
});
}
IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
stopGrid(0);
block.countDown();
// There are two exchanges happen: snapshot, node left (with pme-free).
// Both of them are not require for sending messages.
assertFalse("Pme-free switch doesn't expect messaging exchanging between nodes",
GridTestUtils.waitForCondition(() -> {
boolean hasMsgs = false;
for (Ignite g : G.allGrids())
hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
return hasMsgs;
}, 5_000));
assertThrowsWithCause((Callable<Object>)fut::get, IgniteException.class);
List<GridDhtPartitionsExchangeFuture> exchFuts =
grid(1).context().cache().context().exchange().exchangeFutures();
assertFalse("Exchanges cannot be empty due to snapshot and node left happened",
exchFuts.isEmpty());
for (GridDhtPartitionsExchangeFuture exch : exchFuts) {
assertTrue("Snapshot and node left events must keep `rebalanced` state" + exch,
exch.rebalanced());
}
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotOnMovingPartitionsCoordinatorLeft() throws Exception {
startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
for (Ignite grid : G.allGrids()) {
TestRecordingCommunicationSpi.spi(grid)
.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
}
Ignite ignite = startGrid(2);
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
TestRecordingCommunicationSpi.spi(grid(0))
.waitForBlocked();
CountDownLatch latch = new CountDownLatch(G.allGrids().size());
IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> {
try {
U.await(latch);
stopGrid(0);
}
catch (IgniteInterruptedCheckedException e) {
fail("Must not fail here: " + e.getMessage());
}
});
Queue<T2<GridDhtPartitionExchangeId, Boolean>> exchFuts = new ConcurrentLinkedQueue<>();
for (Ignite ig : G.allGrids()) {
((IgniteEx)ig).context().cache().context().exchange()
.registerExchangeAwareComponent(new PartitionsExchangeAware() {
/** {@inheritDoc} */
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
if (!(fut.firstEvent() instanceof DiscoveryCustomEvent))
return;
try {
exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced()));
latch.countDown();
stopFut.get();
}
catch (IgniteCheckedException e) {
U.log(log, "Interrupted on coordinator: " + e.getMessage());
}
}
});
}
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
stopFut.get();
assertThrowsAnyCause(log,
fut::get,
IgniteException.class,
"Snapshot creation has been finished with an error");
assertEquals("Snapshot futures expected: " + exchFuts, 3, exchFuts.size());
for (T2<GridDhtPartitionExchangeId, Boolean> exch : exchFuts)
assertFalse("Snapshot `rebalanced` must be false with moving partitions: " + exch.get1(), exch.get2());
}
/** @throws Exception If fails. */
@Test
public void testSnapshotPartitionExchangeAwareOrder() throws Exception {
IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
Map<UUID, PartitionsExchangeAware> comps = new HashMap<>();
for (Ignite ig : G.allGrids()) {
PartitionsExchangeAware comp;
((IgniteEx)ig).context().cache().context().exchange()
.registerExchangeAwareComponent(comp = new PartitionsExchangeAware() {
private final AtomicInteger order = new AtomicInteger();
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
assertEquals("Exchange order violated: " + fut.firstEvent(), 0, order.getAndIncrement());
}
@Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
assertEquals("Exchange order violated: " + fut.firstEvent(), 1, order.getAndIncrement());
}
@Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
assertEquals("Exchange order violated: " + fut.firstEvent(), 2, order.getAndIncrement());
}
@Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
assertEquals("Exchange order violated: " + fut.firstEvent(), 3, order.getAndSet(0));
}
});
comps.put(((IgniteEx)ig).localNode().id(), comp);
}
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
for (Ignite ig : G.allGrids()) {
((IgniteEx)ig).context().cache().context().exchange()
.unregisterExchangeAwareComponent(comps.get(((IgniteEx)ig).localNode().id()));
}
awaitPartitionMapExchange();
assertEquals("Some of ignite instances failed during snapshot", 3, G.allGrids().size());
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotFromClient() throws Exception {
startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
IgniteEx clnt = startClientGrid(2);
clnt.snapshot().createSnapshot(SNAPSHOT_NAME).get();
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
awaitPartitionMapExchange();
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testConcurrentClusterSnapshotFromClient() throws Exception {
IgniteEx grid = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
IgniteEx clnt = startClientGrid(2);
IgniteSnapshotManager mgr = snp(grid);
Function<String, SnapshotSender> old = mgr.localSnapshotSenderFactory();
BlockingExecutor block = new BlockingExecutor(mgr.snapshotExecutorService());
mgr.localSnapshotSenderFactory((snpName) ->
new DelegateSnapshotSender(log, block, old.apply(snpName)));
IgniteFuture<Void> fut = grid.snapshot().createSnapshot(SNAPSHOT_NAME);
assertThrowsAnyCause(log,
() -> clnt.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
IgniteException.class,
"Snapshot has not been created");
block.unblock();
fut.get();
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotFromClientDisconnected() throws Exception {
startGridsWithCache(1, dfltCacheCfg, CACHE_KEYS_RANGE);
IgniteEx clnt = startClientGrid(1);
stopGrid(0);
assertThrowsAnyCause(log,
() -> clnt.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
IgniteException.class,
"Client disconnected. Snapshot result is unknown");
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotInProgressCancelled() throws Exception {
IgniteEx srv = startGridsWithCache(1, dfltCacheCfg, CACHE_KEYS_RANGE);
IgniteEx startCli = startClientGrid(1);
IgniteEx killCli = startClientGrid(2);
doSnapshotCancellationTest(startCli, Collections.singletonList(srv), srv.cache(dfltCacheCfg.getName()),
snpName -> killCli.snapshot().cancelSnapshot(snpName).get());
waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotFinishedTryCancel() throws Exception {
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
ignite.snapshot().cancelSnapshot(SNAPSHOT_NAME).get();
stopAllGrids();
IgniteEx snpIg = startGridsFromSnapshot(2, SNAPSHOT_NAME);
assertSnapshotCacheKeys(snpIg.cache(dfltCacheCfg.getName()));
}
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotInMemoryFail() throws Exception {
persistence = false;
IgniteEx srv = startGrid(0);
srv.cluster().state(ACTIVE);
IgniteEx clnt = startClientGrid(1);
IgniteFuture<?> fut = clnt.snapshot().createSnapshot(SNAPSHOT_NAME);
assertThrowsAnyCause(log,
fut::get,
IgniteException.class,
"Snapshots on an in-memory clusters are not allowed.");
}
/**
* @param ignite Ignite instance.
* @param started Latch will be released when delta partition processing starts.
* @param blocked Latch to await delta partition processing.
* @return Factory which produces local snapshot senders.
*/
private Function<String, SnapshotSender> blockingLocalSnapshotSender(IgniteEx ignite,
CountDownLatch started,
CountDownLatch blocked
) {
Function<String, SnapshotSender> old = snp(ignite).localSnapshotSenderFactory();
return (snpName) -> new DelegateSnapshotSender(log, snp(ignite).snapshotExecutorService(), old.apply(snpName)) {
@Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
if (log.isInfoEnabled())
log.info("Processing delta file has been blocked: " + delta.getName());
started.countDown();
try {
U.await(blocked);
if (log.isInfoEnabled())
log.info("Latch released. Processing delta file continued: " + delta.getName());
super.sendDelta0(delta, cacheDirName, pair);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException("Interrupted by node stop", e);
}
}
};
}
/** {@inheritDoc} */
@Override protected boolean isMultiJvm() {
return jvm;
}
/**
* @param ignite Ignite instance.
* @param caches Cache names to read values.
* @return Summary value.
*/
private static int sumAllCacheValues(Ignite ignite, int keys, String... caches) {
AtomicInteger total = new AtomicInteger();
for (String name : caches) {
IgniteCache<Integer, Account> cache = ignite.cache(name);
for (int key = 0; key < keys; key++)
total.addAndGet(cache.get(key).balance);
}
return total.get();
}
/**
* I/O Factory which will halt JVM on conditions occurred.
*/
private static class HaltJvmFileIOFactory implements FileIOFactory {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
/** Delegate factory. */
private final FileIOFactory delegate;
/** Condition to halt. */
private final Predicate<File> pred;
/**
* @param delegate Delegate factory.
*/
public HaltJvmFileIOFactory(FileIOFactory delegate, Predicate<File> pred) {
this.delegate = delegate;
this.pred = pred;
}
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
FileIO delegate = this.delegate.create(file, modes);
if (pred.test(file))
Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
return delegate;
}
}
}