blob: 59525a2b8eb307e21551f09468454c97d81b422f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_THREAD_POOL_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.groupIdFromTmpDir;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestoreBaseTest {
/** */
private int snapshotThreadPoolSize = DFLT_SNAPSHOT_THREAD_POOL_SIZE;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setSnapshotThreadPoolSize(snapshotThreadPoolSize);
return cfg;
}
/** @throws Exception If fails. */
@Test
public void testSnapshotRemoteRequestFromSingleNode() throws Exception {
int rqCnt = 10;
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), grid(1).localNode().id());
awaitPartitionMapExchange();
CountDownLatch latch = new CountDownLatch(parts.values().stream().mapToInt(Set::size).sum() * rqCnt);
GridCompoundFuture<Void, Void> compFut = new GridCompoundFuture<>();
IgniteInternalFuture<?> runFut = GridTestUtils.runMultiThreadedAsync(() -> {
try {
Map<Integer, Set<Integer>> parts0 = new HashMap<>();
for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
parts0.computeIfAbsent(e.getKey(), k -> new HashSet<>()).addAll(e.getValue());
IgniteInternalFuture<Void> locFut;
compFut.add(locFut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
SNAPSHOT_NAME,
null,
parts,
() -> false,
defaultPartitionConsumer(parts0, latch)));
locFut.listen(f -> assertEquals("All partitions must be handled: " + parts0,
F.size(parts0.values(), Set::isEmpty), parts0.size()));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}, rqCnt, "rq-creator-");
runFut.get(TIMEOUT);
U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
compFut.markInitialized().get(TIMEOUT);
}
/** @throws Exception If fails. */
@Test
public void testSnapshotRemoteRequestEachOther() throws Exception {
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
IgniteSnapshotManager mgr0 = snp(ignite);
IgniteSnapshotManager mgr1 = snp(grid(1));
UUID node0 = grid(0).localNode().id();
UUID node1 = grid(1).localNode().id();
Map<Integer, Set<Integer>> fromNode1 = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), node1);
Map<Integer, Set<Integer>> fromNode0 = owningParts(grid(1), CU.cacheId(DEFAULT_CACHE_NAME), node0);
G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g)
.blockMessages((n, msg) -> msg instanceof SnapshotFilesRequestMessage));
CountDownLatch latch = new CountDownLatch(fromNode1.values().stream().mapToInt(Set::size).sum() +
fromNode0.values().stream().mapToInt(Set::size).sum());
// Snapshot must be taken on node1 and transmitted to node0.
IgniteInternalFuture<?> futFrom1To0 = mgr0.requestRemoteSnapshotFiles(node1, SNAPSHOT_NAME, null, fromNode1, () -> false,
defaultPartitionConsumer(fromNode1, latch));
IgniteInternalFuture<?> futFrom0To1 = mgr1.requestRemoteSnapshotFiles(node0, SNAPSHOT_NAME, null, fromNode0, () -> false,
defaultPartitionConsumer(fromNode0, latch));
G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g).stopBlock());
latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
futFrom0To1.get(TIMEOUT);
futFrom1To0.get(TIMEOUT);
}
/** @throws Exception If fails. */
@Test
public void testRemoteRequestedInitiatorNodeLeft() throws Exception {
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
awaitPartitionMapExchange();
IgniteSnapshotManager mgr1 = snp(grid(1));
UUID rmtNodeId = grid(1).localNode().id();
UUID locNodeId = grid(0).localNode().id();
Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), rmtNodeId);
CountDownLatch sndLatch = new CountDownLatch(1);
mgr1.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
@Override public SnapshotSender apply(String s, UUID uuid) {
return new DelegateSnapshotSender(log, mgr1.snapshotExecutorService(), mgr1.remoteSnapshotSenderFactory(s, uuid)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
if (partId(part.getName()) > 0) {
try {
sndLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw new IgniteException(e);
}
}
super.sendPart0(part, cacheDirName, pair, length);
}
};
}
});
snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
SNAPSHOT_NAME,
null,
parts,
() -> false,
(part, t) -> {
});
IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
assertTrue(waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
IgniteInternalFuture<?> snpFut = snp(grid(1)).lastScheduledSnapshotResponseRemoteTask(locNodeId);
if (snpFut == null)
return false;
else {
futs[0] = snpFut;
return true;
}
}
}, 5_000L));
stopGrid(0);
sndLatch.countDown();
GridTestUtils.assertThrowsAnyCause(log, () -> futs[0].get(TIMEOUT), ClusterTopologyCheckedException.class, null);
}
/** @throws Exception If fails. */
@Test
public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception {
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
grid(1).localNode().id());
CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
SNAPSHOT_NAME,
null,
parts,
() -> false,
(part, t) -> {
if (t == null) {
int grpId = groupIdFromTmpDir(part.getParentFile());
assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
assertTrue("Received partition has not been requested",
parts.get(grpId).contains(partId(part.getName())));
try {
U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
else {
assertTrue(t instanceof ClusterTopologyCheckedException);
assertNull(part);
}
});
stopGrid(1);
latch.countDown();
assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class,
"he node from which a snapshot has been requested left the grid");
}
/** @throws Exception If fails. */
@Test
public void testSnapshotRequestRemoteCancel() throws Exception {
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
grid(1).localNode().id());
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean stopChecker = new AtomicBoolean();
IgniteInternalFuture<Void> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
SNAPSHOT_NAME,
null,
parts,
stopChecker::get,
(part, t) -> {
try {
U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
});
IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
assertTrue(waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
IgniteInternalFuture<?> snpFut = snp(grid(1))
.lastScheduledSnapshotResponseRemoteTask(grid(0).localNode().id());
if (snpFut == null)
return false;
else {
futs[0] = snpFut;
return true;
}
}
}, 5_000L));
stopChecker.set(true);
latch.countDown();
assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), TransmissionCancelledException.class,
"Future cancelled prior to the all requested partitions processed");
}
/** @throws Exception If fails. */
@Test
public void testSendPartitonsSequentially() throws Exception {
snapshotThreadPoolSize = 1;
IgniteEx sndr = startGridsWithCache(3, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
UUID sndNode = sndr.localNode().id();
sndr.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
IgniteSnapshotManager mgr0 = snp(grid(0));
List<UUID> nodes = new CopyOnWriteArrayList<>();
mgr0.remoteSnapshotSenderFactory((rqId, nodeId) -> new DelegateSnapshotSender(log,
snp(sndr).snapshotExecutorService(), mgr0.remoteSnapshotSenderFactory(rqId, nodeId)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
nodes.add(nodeId);
// Single thread must send partitions sequentially node by node.
checkDuplicates(nodes);
super.sendPart0(part, cacheDirName, pair, length);
}
});
Collection<IgniteEx> rcvrs = F.viewReadOnly(G.allGrids(), srv -> (IgniteEx)srv,
srv -> !F.eq(sndr.localNode(), srv.cluster().localNode()));
GridCompoundFuture<Void, Void> futs = new GridCompoundFuture<>();
for (IgniteEx rcv : rcvrs) {
Map<Integer, Set<Integer>> expParts = owningParts(rcv, CU.cacheId(DEFAULT_CACHE_NAME), sndNode);
IgniteInternalFuture<Void> fut = snp(rcv)
.requestRemoteSnapshotFiles(sndNode, SNAPSHOT_NAME, null, expParts, () -> false,
defaultPartitionConsumer(expParts, null));
fut.listen(f -> expParts.values().forEach(integers -> assertTrue(integers.isEmpty())));
futs.add(fut);
}
futs.markInitialized().get(getTestTimeout());
}
/**
* @param parts Expected partitions.
* @param latch Latch to await partitions processed.
* @return Consumer.
*/
private static BiConsumer<File, Throwable> defaultPartitionConsumer(Map<Integer, Set<Integer>> parts, CountDownLatch latch) {
return (part, t) -> {
assertNull(t);
int grpId = groupIdFromTmpDir(part.getParentFile());
assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
assertTrue("Received partition has not been requested",
parts.get(grpId).remove(partId(part.getName())));
if (latch != null)
latch.countDown();
};
}
/**
* @param src Source node to calculate.
* @param grpId Group id to collect OWNING partitions.
* @param rmtNodeId Remote node id.
* @return Map of collected parts.
*/
private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, int grpId, UUID rmtNodeId) {
return Collections.singletonMap(grpId, src.context()
.cache()
.cacheGroup(grpId)
.topology()
.partitions(rmtNodeId)
.entrySet()
.stream()
.filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));
}
/**
* Checks that the list can contain only neighboring duplicates:
* 1, 1, 1, 3, 2, 2 -> ok
* 1, 1, 3, 1, 2, 2 -> fail
*
* @param list List to check.
*/
private <T> void checkDuplicates(List<T> list) {
LinkedList<T> grouped = new LinkedList<>();
for (T item : list) {
if (!F.eq(grouped.peekLast(), item))
grouped.add(item);
}
if (list.stream().distinct().collect(Collectors.toList()).equals(grouped))
return;
fail("List contains non neighboring duplicates: " + list);
}
}