blob: 139b9a30662565d935d7dfaac84c9d2409c2076d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Snapshot restore tests.
*/
public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotRestoreBaseTest {
/** Type name used for binary and SQL. */
private static final String TYPE_NAME = "CustomType";
/** Cache 1 name. */
private static final String CACHE1 = "cache1";
/** Cache 2 name. */
private static final String CACHE2 = "cache2";
/** Default shared cache group name. */
private static final String SHARED_GRP = "shared";
/** Cache value builder. */
private Function<Integer, Object> valBuilder = String::valueOf;
/** Reset consistent ID flag. */
private boolean resetConsistentId;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (resetConsistentId)
cfg.setConsistentId(null);
return cfg;
}
/** {@inheritDoc} */
@Override protected Function<Integer, Object> valueBuilder() {
return valBuilder;
}
/** @throws Exception If failed. */
@Test
public void testRestoreAllGroups() throws Exception {
doRestoreAllGroups();
}
/** @throws Exception If failed. */
@Test
public void testRestoreAllGroupsWithoutConsistentId() throws Exception {
resetConsistentId = true;
doRestoreAllGroups();
}
/** @throws Exception If failed. */
private void doRestoreAllGroups() throws Exception {
CacheConfiguration<Integer, Object> cacheCfg1 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
CacheConfiguration<Integer, Object> cacheCfg2 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder,
dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
ignite.cache(CACHE1).destroy();
ignite.cache(CACHE2).destroy();
ignite.cache(DEFAULT_CACHE_NAME).destroy();
awaitPartitionMapExchange();
// Restore all cache groups.
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
}
/** @throws Exception If failed. */
@Test
public void testStartClusterSnapshotRestoreMultipleThreadsSameNode() throws Exception {
checkStartClusterSnapshotRestoreMultithreaded(() -> 0);
}
/** @throws Exception If failed. */
@Test
public void testStartClusterSnapshotRestoreMultipleThreadsDiffNode() throws Exception {
AtomicInteger nodeIdx = new AtomicInteger();
checkStartClusterSnapshotRestoreMultithreaded(nodeIdx::getAndIncrement);
}
/**
* @param nodeIdxSupplier Ignite node index supplier.
*/
private void checkStartClusterSnapshotRestoreMultithreaded(IntSupplier nodeIdxSupplier) throws Exception {
Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
AtomicInteger successCnt = new AtomicInteger();
AtomicInteger failCnt = new AtomicInteger();
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
try {
nodeIdxSupplier.getAsInt();
grid(nodeIdxSupplier.getAsInt()).snapshot().restoreSnapshot(
SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
successCnt.incrementAndGet();
}
catch (Exception e) {
failCnt.incrementAndGet();
}
}, 2, "runner");
fut.get(TIMEOUT);
assertEquals(1, successCnt.get());
assertEquals(1, failCnt.get());
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
}
/** @throws Exception If failed. */
@Test
public void testCreateSnapshotDuringRestore() throws Exception {
Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0));
discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch);
IgniteFuture<Void> fut =
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
discoSpi.waitBlocked(TIMEOUT);
GridTestUtils.assertThrowsAnyCause(
log,
() -> grid(1).snapshot().createSnapshot("NEW_SNAPSHOT").get(TIMEOUT),
IgniteException.class,
"Cache group restore operation is currently in progress."
);
discoSpi.unblock();
fut.get(TIMEOUT);
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
}
/**
* Ensures that the cache doesn't start if one of the baseline nodes fails.
*
* @throws Exception If failed.
*/
@Test
public void testNodeLeftDuringCacheStartOnExchangeInit() throws Exception {
startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true);
BlockingCustomMessageDiscoverySpi discoSpi = discoSpi(grid(0));
discoSpi.block((msg) -> msg instanceof DynamicCacheChangeBatch);
IgniteFuture<Void> fut =
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
discoSpi.waitBlocked(TIMEOUT);
stopGrid(2, true);
discoSpi.unblock();
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
ensureCacheAbsent(dfltCacheCfg);
}
/**
* Ensures that the cache is not started if non-coordinator node left during the exchange.
*
* @throws Exception If failed.
*/
@Test
public void testNodeLeftDuringCacheStartOnExchangeFinish() throws Exception {
checkNodeLeftOnExchangeFinish(
false, ClusterTopologyCheckedException.class, "Required node has left the cluster");
}
/**
* Ensures that the cache is not started if the coordinator left during the exchange.
*
* @throws Exception If failed.
*/
@Test
public void testCrdLeftDuringCacheStartOnExchangeFinish() throws Exception {
checkNodeLeftOnExchangeFinish(
true, IgniteCheckedException.class, "Operation has been cancelled (node is stopping)");
}
/**
* @param crdStop {@code True} to stop coordinator node.
* @param expCls Expected exception class.
* @param expMsg Expected exception message.
* @throws Exception If failed.
*/
private void checkNodeLeftOnExchangeFinish(
boolean crdStop,
Class<? extends Throwable> expCls,
String expMsg
) throws Exception {
startGridsWithSnapshot(3, CACHE_KEYS_RANGE, true);
TestRecordingCommunicationSpi node1spi = TestRecordingCommunicationSpi.spi(grid(1));
TestRecordingCommunicationSpi node2spi = TestRecordingCommunicationSpi.spi(grid(2));
node1spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
node2spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
IgniteFuture<Void> fut =
grid(1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
node1spi.waitForBlocked();
node2spi.waitForBlocked();
stopGrid(crdStop ? 0 : 2, true);
node1spi.stopBlock();
if (crdStop)
node2spi.stopBlock();
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), expCls, expMsg);
awaitPartitionMapExchange();
ensureCacheAbsent(dfltCacheCfg);
}
/** @throws Exception If failed. */
@Test
public void testClusterSnapshotRestoreRejectOnInActiveCluster() throws Exception {
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
ignite.cluster().state(ClusterState.INACTIVE);
IgniteFuture<Void> fut =
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
GridTestUtils.assertThrowsAnyCause(
log, () -> fut.get(TIMEOUT), IgniteException.class, "The cluster should be active");
}
/** @throws Exception If failed. */
@Test
public void testClusterSnapshotRestoreOnSmallerTopology() throws Exception {
startGridsWithSnapshot(2, CACHE_KEYS_RANGE, true);
stopGrid(1);
resetBaselineTopology();
IgniteFuture<Void> fut =
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
ensureCacheAbsent(dfltCacheCfg);
}
/** @throws Exception If failed. */
@Test
public void testRestoreSharedCacheGroup() throws Exception {
CacheConfiguration<Integer, Object> cacheCfg1 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
CacheConfiguration<Integer, Object> cacheCfg2 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valBuilder, cacheCfg1, cacheCfg2);
ignite.cluster().state(ClusterState.ACTIVE);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
ignite.cache(CACHE1).destroy();
awaitPartitionMapExchange();
IgniteSnapshot snp = ignite.snapshot();
GridTestUtils.assertThrowsAnyCause(
log,
() -> snp.restoreSnapshot(SNAPSHOT_NAME, Arrays.asList(CACHE1, CACHE2)).get(TIMEOUT),
IllegalArgumentException.class,
"Cache group(s) was not found in the snapshot"
);
ignite.cache(CACHE2).destroy();
awaitPartitionMapExchange();
snp.restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(SHARED_GRP)).get(TIMEOUT);
assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
}
/** @throws Exception If failed. */
@Test
public void testIncompatibleMetasUpdate() throws Exception {
valBuilder = new BinaryValueBuilder(TYPE_NAME);
IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
int typeId = ignite.context().cacheObjects().typeId(TYPE_NAME);
ignite.context().cacheObjects().removeType(typeId);
BinaryObject[] objs = new BinaryObject[CACHE_KEYS_RANGE];
IgniteCache<Integer, Object> cache1 = createCacheWithBinaryType(ignite, "cache1", n -> {
BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME);
builder.setField("id", n);
objs[n] = builder.build();
return objs[n];
});
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
// Ensure that existing type has been updated.
BinaryType type = ignite.context().cacheObjects().metadata(typeId);
assertTrue(type.fieldNames().contains("name"));
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
assertEquals(objs[i], cache1.get(i));
cache1.destroy();
grid(0).cache(DEFAULT_CACHE_NAME).destroy();
ignite.context().cacheObjects().removeType(typeId);
// Create cache with incompatible binary type.
cache1 = createCacheWithBinaryType(ignite, "cache1", n -> {
BinaryObjectBuilder builder = ignite.binary().builder(TYPE_NAME);
builder.setField("id", UUID.randomUUID());
objs[n] = builder.build();
return objs[n];
});
IgniteFuture<Void> fut0 =
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), BinaryObjectException.class, null);
ensureCacheAbsent(dfltCacheCfg);
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
assertEquals(objs[i], cache1.get(i));
}
/**
* @param ignite Ignite.
* @param cacheName Cache name.
* @param valBuilder Binary value builder.
* @return Created cache.
*/
private IgniteCache<Integer, Object> createCacheWithBinaryType(
Ignite ignite,
String cacheName,
Function<Integer, BinaryObject> valBuilder
) {
IgniteCache<Integer, Object> cache = ignite.createCache(new CacheConfiguration<>(cacheName)).withKeepBinary();
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
cache.put(i, valBuilder.apply(i));
return cache;
}
/**
* @throws Exception if failed
*/
@Test
public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception {
checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class,
"Cache start failed. A cache or group with the same name is currently being restored from a snapshot");
}
/**
* @throws Exception if failed
*/
@Test
public void testParallelCacheStartWithTheSameNameOnStart() throws Exception {
checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_START, CacheExistsException.class,
"Failed to start cache (a cache with the same name is already started):");
}
/**
* @param procType The type of distributed process on which communication is blocked.
* @throws Exception if failed.
*/
private void checkCacheStartWithTheSameName(
DistributedProcessType procType,
Class<? extends Throwable> expCls,
String expMsg
) throws Exception {
dfltCacheCfg = txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, SHARED_GRP);
GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(SHARED_GRP), IgniteCheckedException.class, null);
GridTestUtils.assertThrowsAnyCause(log, () -> ignite.createCache(CACHE1), expCls, expMsg);
spi.stopBlock();
fut.get(TIMEOUT);
assertCacheKeys(grid(0).cache(CACHE1), CACHE_KEYS_RANGE);
}
/** @throws Exception If failed. */
@Test
public void testNodeFailDuringRestore() throws Exception {
startGridsWithSnapshot(4, CACHE_KEYS_RANGE);
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3));
IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
IgniteInternalFuture<?> fut0 = runAsync(() -> stopGrid(3, true));
GridTestUtils.assertThrowsAnyCause(
log,
() -> fut.get(TIMEOUT),
ClusterTopologyCheckedException.class,
"Required node has left the cluster"
);
fut0.get(TIMEOUT);
awaitPartitionMapExchange();
ensureCacheAbsent(dfltCacheCfg);
GridTestUtils.assertThrowsAnyCause(
log,
() -> startGrid(3),
IgniteSpiException.class,
"to add the node to cluster - remove directories with the caches"
);
}
/** @throws Exception If failed. */
@Test
public void testNodeFailDuringFilesCopy() throws Exception {
dfltCacheCfg.setCacheMode(CacheMode.REPLICATED);
startGridsWithSnapshot(3, CACHE_KEYS_RANGE);
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(2));
CountDownLatch stopLatch = new CountDownLatch(1);
spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage &&
((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME,
PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString();
grid(2).context().cache().context().snapshotMgr().ioFactory(
new CustomFileIOFactory(new RandomAccessFileIOFactory(),
file -> {
if (file.getPath().endsWith(failingFilePath)) {
stopLatch.countDown();
throw new RuntimeException("Test exception");
}
}));
File node2dbDir = ((FilePageStoreManager)grid(2).context().cache().context().pageStore()).
cacheWorkDir(dfltCacheCfg).getParentFile();
IgniteInternalFuture<Object> stopFut = runAsync(() -> {
U.await(stopLatch, TIMEOUT, TimeUnit.MILLISECONDS);
stopGrid(2, true);
return null;
});
IgniteFuture<Void> fut =
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
stopFut.get(TIMEOUT);
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
File[] files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
assertEquals("A temp directory with potentially corrupted files must exist.", 1, files.length);
ensureCacheAbsent(dfltCacheCfg);
dfltCacheCfg = null;
startGrid(2);
files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
assertEquals("A temp directory should be removed at node startup", 0, files.length);
}
/** @throws Exception If failed. */
@Test
public void testNodeJoinDuringRestore() throws Exception {
Ignite ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
GridTestUtils.assertThrowsAnyCause(
log,
() -> startGrid(2),
IgniteSpiException.class,
"Joining node during caches restore is not allowed"
);
spi.stopBlock();
fut.get(TIMEOUT);
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
assertTrue(cache.indexReadyFuture().isDone());
assertCacheKeys(cache, CACHE_KEYS_RANGE);
}
/**
* @throws Exception if failed.
*/
@Test
public void testClusterStateChangeActiveReadonlyOnPrepare() throws Exception {
checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
IgniteException.class, "Failed to perform start cache operation (cluster is in read-only mode)");
}
/**
* @throws Exception if failed.
*/
@Test
public void testClusterStateChangeActiveReadonlyOnCacheStart() throws Exception {
checkClusterStateChange(ClusterState.ACTIVE_READ_ONLY, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null);
}
/**
* @throws Exception if failed.
*/
@Test
public void testClusterDeactivateOnPrepare() throws Exception {
checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
IgniteException.class, "The cluster has been deactivated.");
}
/**
* @throws Exception if failed.
*/
@Test
public void testClusterDeactivateOnCacheStart() throws Exception {
checkClusterStateChange(ClusterState.INACTIVE, RESTORE_CACHE_GROUP_SNAPSHOT_START, null, null);
}
/**
* @param state Cluster state.
* @param procType The type of distributed process on which communication is blocked.
* @param exCls Expected exception class.
* @param expMsg Expected exception message.
* @throws Exception if failed.
*/
private void checkClusterStateChange(
ClusterState state,
DistributedProcessType procType,
@Nullable Class<? extends Throwable> exCls,
@Nullable String expMsg
) throws Exception {
int nodesCnt = 2;
Ignite ignite = startGridsWithSnapshot(nodesCnt, CACHE_KEYS_RANGE, true);
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(nodesCnt - 1));
IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, DEFAULT_CACHE_NAME);
ignite.cluster().state(state);
spi.stopBlock();
if (exCls == null) {
fut.get(TIMEOUT);
ignite.cluster().state(ClusterState.ACTIVE);
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
return;
}
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), exCls, expMsg);
ignite.cluster().state(ClusterState.ACTIVE);
ensureCacheAbsent(dfltCacheCfg);
String cacheName = DEFAULT_CACHE_NAME;
grid(nodesCnt - 1).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(cacheName)).get(TIMEOUT);
assertCacheKeys(ignite.cache(cacheName), CACHE_KEYS_RANGE);
}
/**
* @param ccfg Cache configuration.
* @throws IgniteCheckedException if failed.
*/
private void ensureCacheAbsent(CacheConfiguration<?, ?> ccfg) throws IgniteCheckedException {
String cacheName = ccfg.getName();
for (Ignite ignite : G.allGrids()) {
GridKernalContext kctx = ((IgniteEx)ignite).context();
if (kctx.clientNode())
continue;
CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName));
assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc);
GridTestUtils.waitForCondition(
() -> !kctx.cache().context().snapshotMgr().isRestoring(),
TIMEOUT);
File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg);
String errMsg = String.format("%s, dir=%s, exists=%b, files=%s",
ignite.name(), dir, dir.exists(), Arrays.toString(dir.list()));
assertTrue(errMsg, !dir.exists() || dir.list().length == 0);
}
}
/**
* @param spi Test communication spi.
* @param restorePhase The type of distributed process on which communication is blocked.
* @param grpName Cache group name.
* @return Snapshot restore future.
* @throws InterruptedException if interrupted.
*/
private IgniteFuture<Void> waitForBlockOnRestore(
TestRecordingCommunicationSpi spi,
DistributedProcessType restorePhase,
String grpName
) throws InterruptedException {
spi.blockMessages((node, msg) ->
msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
IgniteFuture<Void> fut =
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
spi.waitForBlocked();
return fut;
}
/**
* Custom I/O factory to preprocessing created files.
*/
private static class CustomFileIOFactory implements FileIOFactory {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
/** Delegate factory. */
private final FileIOFactory delegate;
/** Preprocessor for created files. */
private final Consumer<File> hnd;
/**
* @param delegate Delegate factory.
* @param hnd Preprocessor for created files.
*/
public CustomFileIOFactory(FileIOFactory delegate, Consumer<File> hnd) {
this.delegate = delegate;
this.hnd = hnd;
}
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
FileIO delegate = this.delegate.create(file, modes);
hnd.accept(file);
return delegate;
}
}
}