blob: 9022676a58f89f82c5ea481fe214fa3ecdde685e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
/** */
public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
/** */
private static final String FIRST_CLUSTER_PREFIX = "one_";
/** */
private static final String SECOND_CLUSTER_PREFIX = "two_";
/** */
private static final String CACHE_WITH_NODE_FILTER = "cacheWithFilter";
/** Node filter filter test restoring on some nodes only. */
private static final IgnitePredicate<ClusterNode> ZERO_SUFFIX_NODE_FILTER = new IgnitePredicate<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return node.consistentId().toString().endsWith("0");
}
};
/** {@code true} if snapshot parts has been initialized on test-class startup. */
private static boolean inited;
/** Snapshot parts on dedicated cluster. Each part has its own local directory. */
private static final Set<Path> snpParts = new HashSet<>();
/** */
private static final Function<String, BiFunction<Integer, IgniteConfiguration, String>> CLUSTER_DIR =
new Function<String, BiFunction<Integer, IgniteConfiguration, String>>() {
@Override public BiFunction<Integer, IgniteConfiguration, String> apply(String prefix) {
return (id, cfg) -> Paths.get(defaultWorkDirectory().toString(),
prefix + U.maskForFileName(cfg.getIgniteInstanceName())).toString();
}
};
/** Cache value builder. */
private final Function<Integer, Object> valBuilder = String::valueOf;
/** @throws Exception If fails. */
@Before
public void prepareDedicatedSnapshot() throws Exception {
if (!inited) {
cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
CacheConfiguration<Integer, Object> cacheCfg1 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
CacheConfiguration<Integer, Object> cacheCfg2 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
CacheConfiguration<Integer, Object> cacheCfg3 =
txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE_WITH_NODE_FILTER))
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 16))
.setNodeFilter(ZERO_SUFFIX_NODE_FILTER);
IgniteEx ignite = startDedicatedGridsWithCache(FIRST_CLUSTER_PREFIX, 6, CACHE_KEYS_RANGE, valBuilder,
dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2, cacheCfg3);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
awaitPartitionMapExchange();
stopAllGrids();
snpParts.addAll(findSnapshotParts(FIRST_CLUSTER_PREFIX, SNAPSHOT_NAME));
inited = true;
}
beforeTestSnapshot();
cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
}
/** @throws Exception If fails. */
@After
public void afterSwitchSnapshot() throws Exception {
afterTestSnapshot();
cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
}
/** */
@AfterClass
public static void cleanupSnapshot() {
snpParts.forEach(U::delete);
cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
}
/** @throws Exception If failed. */
@Test
public void testRestoreAllGroups() throws Exception {
IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
scc.cluster().state(ClusterState.ACTIVE);
copyAndShuffle(snpParts, G.allGrids());
grid(0).cache(DEFAULT_CACHE_NAME).destroy();
for (Ignite g : G.allGrids())
TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
// Restore all cache groups.
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
awaitPartitionMapExchange(true, true, null, true);
assertCacheKeys(scc.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
assertCacheKeys(scc.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(scc.cache(CACHE2), CACHE_KEYS_RANGE);
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
List<Object> msgs = new ArrayList<>();
for (Ignite g : G.allGrids())
msgs.addAll(TestRecordingCommunicationSpi.spi(g).recordedMessages(true));
assertPartitionsDuplicates(msgs);
}
/** @throws Exception If failed. */
@Test
public void testRestoreFromAnEmptyNode() throws Exception {
startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
copyAndShuffle(snpParts, G.allGrids());
// Start a new node without snapshot working directory.
IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
emptyNode.cluster().state(ClusterState.ACTIVE);
emptyNode.cache(DEFAULT_CACHE_NAME).destroy();
awaitPartitionMapExchange();
// Ensure that the snapshot check command succeeds.
IdleVerifyResultV2 res =
emptyNode.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
StringBuilder buf = new StringBuilder();
res.print(buf::append, true);
assertTrue(F.isEmpty(res.exceptions()));
assertPartitionsSame(res);
assertContains(log, buf.toString(), "The check procedure has finished, no conflicts have been found");
// Restore all cache groups.
emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
awaitPartitionMapExchange(true, true, null, true);
for (Ignite grid : G.allGrids()) {
assertCacheKeys(grid.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
assertCacheKeys(grid.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(grid.cache(CACHE2), CACHE_KEYS_RANGE);
}
}
/** @throws Exception If failed. */
@Test
public void testRestoreNoRebalance() throws Exception {
IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
scc.cluster().state(ClusterState.ACTIVE);
copyAndShuffle(snpParts, G.allGrids());
grid(0).cache(DEFAULT_CACHE_NAME).destroy();
for (Ignite g : G.allGrids())
TestRecordingCommunicationSpi.spi(g).record(GridDhtPartitionDemandMessage.class);
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(CACHE_WITH_NODE_FILTER)).get(TIMEOUT);
awaitPartitionMapExchange(true, true, null, true);
assertCacheKeys(scc.cache(CACHE_WITH_NODE_FILTER), CACHE_KEYS_RANGE);
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
for (Ignite g : G.allGrids())
assertTrue(TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
}
/** @throws Exception If failed. */
@Test
public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
scc.cluster().state(ClusterState.ACTIVE);
copyAndShuffle(snpParts, G.allGrids());
grid(0).cache(DEFAULT_CACHE_NAME).destroy();
IgniteSnapshotManager mgr = snp(grid(1));
mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
@Override public SnapshotSender apply(String s, UUID uuid) {
return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
if (partId(part.getName()) > 0)
throw new IgniteException("Test exception. Uploading partition file failed: " + pair);
super.sendPart0(part, cacheDirName, pair, length);
}
};
}
});
IgniteFuture<?> fut = grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
GridTestUtils.assertThrowsAnyCause(log,
() -> fut.get(TIMEOUT),
IgniteException.class,
"Test exception. Uploading partition file failed");
assertNull(scc.cache(DEFAULT_CACHE_NAME));
ensureCacheAbsent(dfltCacheCfg);
}
/**
* @param snpParts Snapshot parts.
* @param toNodes List of toNodes to copy parts to.
*/
private static void copyAndShuffle(Set<Path> snpParts, List<Ignite> toNodes) {
AtomicInteger cnt = new AtomicInteger();
snpParts.forEach(p -> {
try {
IgniteEx loc = (IgniteEx)toNodes.get(cnt.getAndIncrement() % toNodes.size());
String snpName = p.getFileName().toString();
U.copy(p.toFile(),
Paths.get(resolveSnapshotWorkDirectory(loc.configuration()).getAbsolutePath(), snpName).toFile(),
false);
}
catch (IOException e) {
throw new IgniteException(e);
}
});
}
/**
* @param clusterPrefix Array of prefixes to clean up directories.
*/
private static void cleanupDedicatedPersistenceDirs(String... clusterPrefix) {
for (String prefix : clusterPrefix) {
try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
) {
for (Path dir : ds)
U.delete(dir);
}
catch (IOException e) {
throw new IgniteException(e);
}
}
}
/**
* @return Collection of dedicated snapshot paths located in Ignite working directory.
*/
private static Set<Path> findSnapshotParts(String prefix, String snpName) {
Set<Path> snpPaths = new HashSet<>();
try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
) {
for (Path dir : ds)
snpPaths.add(searchDirectoryRecursively(dir, snpName)
.orElseThrow(() -> new IgniteException("Snapshot not found in the Ignite work directory " +
"[dir=" + dir.toString() + ", snpName=" + snpName + ']')));
return snpPaths;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* @param grids Number of ignite instances to start.
* @param keys Number of keys to create.
* @param valMapper Factory which produces values.
* @param <V> Cache value type.
* @return Ignite coordinator instance.
* @throws Exception If fails.
*/
private <V> IgniteEx startDedicatedGridsWithCache(
String prefix,
int grids,
int keys,
Function<Integer, V> valMapper,
CacheConfiguration<Integer, V>... ccfgs
) throws Exception {
return startGridsWithCache(grids,
keys,
valMapper,
CLUSTER_DIR.apply(prefix),
ccfgs);
}
/**
* @param grids Number of ignite instances to start.
* @return Ignite coordinator instance.
* @throws Exception If fails.
*/
private IgniteEx startDedicatedGrids(String prefix, int grids) throws Exception {
for (int g = 0; g < grids; g++)
startDedicatedGrid(prefix, g);
grid(0).events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT);
return grid(0);
}
/**
* @param prefix Grid work directory prefix.
* @param id Grid index.
* @return Grid instance.
* @throws Exception If fails.
*/
private IgniteEx startDedicatedGrid(String prefix, int id) throws Exception {
IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(id)));
cfg.setWorkDirectory(CLUSTER_DIR.apply(prefix).apply(id, cfg));
return startGrid(cfg);
}
/**
* @return Default work directory.
*/
private static Path defaultWorkDirectory() {
try {
return Paths.get(U.defaultWorkDirectory());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** */
private static void assertPartitionsDuplicates(List<Object> msgs) {
List<GroupPartitionId> all = new ArrayList<>();
for (Object o : msgs) {
SnapshotFilesRequestMessage msg0 = (SnapshotFilesRequestMessage)o;
Map<Integer, Set<Integer>> parts = msg0.parts();
for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
for (Integer partId : e.getValue())
all.add(new GroupPartitionId(e.getKey(), partId));
}
}
assertEquals(all.size(), new HashSet<>(all).size());
}
}