blob: 909325c98c0be4493aa1b6571721bbeea5365a3c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.TestDelayingCommunicationSpi;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.testframework.GridTestUtils.mergeExchangeWaitVersion;
/**
*
*/
public class CacheExchangeMergeTest extends GridCommonAbstractTest {
/** */
private static final long WAIT_SECONDS = 45;
/** */
private boolean testSpi;
/** */
private boolean testDelaySpi;
/** */
private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"};
/** */
private boolean cfgCache = true;
/** */
private IgniteClosure<String, Boolean> clientC;
/** */
private static ExecutorService executor;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (testSpi)
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
else if (testDelaySpi)
cfg.setCommunicationSpi(new TestDelayExchangeMessagesSpi());
if (clientC != null)
cfg.setClientMode(clientC.apply(igniteInstanceName));
if (cfgCache) {
cfg.setCacheConfiguration(
cacheConfiguration("c1", ATOMIC, PARTITIONED, 0),
cacheConfiguration("c2", ATOMIC, PARTITIONED, 1),
cacheConfiguration("c3", ATOMIC, PARTITIONED, 2),
cacheConfiguration("c4", ATOMIC, PARTITIONED, 10),
cacheConfiguration("c5", ATOMIC, REPLICATED, 0),
cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0),
cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1),
cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2),
cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10),
cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0),
cacheConfiguration("c11", TRANSACTIONAL_SNAPSHOT, PARTITIONED, 0),
cacheConfiguration("c12", TRANSACTIONAL_SNAPSHOT, PARTITIONED, 1),
cacheConfiguration("c13", TRANSACTIONAL_SNAPSHOT, PARTITIONED, 2),
cacheConfiguration("c14", TRANSACTIONAL_SNAPSHOT, PARTITIONED, 10),
cacheConfiguration("c15", TRANSACTIONAL_SNAPSHOT, REPLICATED, 0)
);
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
if (executor != null)
executor.shutdown();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* @param name Cache name.
* @param atomicityMode Cache atomicity mode.
* @param cacheMode Cache mode.
* @param backups Number of backups.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(String name,
CacheAtomicityMode atomicityMode,
CacheMode cacheMode,
int backups)
{
CacheConfiguration ccfg = new CacheConfiguration(name);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cacheMode);
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
return ccfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testDelayExchangeMessages() throws Exception {
testDelaySpi = true;
System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, "2000");
try {
final int srvs = 6;
final int clients = 3;
startGridsMultiThreaded(srvs);
for (int i = 0; i < clients; i++)
startClientGrid(srvs + i);
final int initNodes = srvs + clients;
final AtomicInteger stopIdx = new AtomicInteger();
IgniteInternalFuture stopFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 1);
stopGrid(stopIdx.incrementAndGet());
return null;
}
}, 3, "stop-srv");
final AtomicInteger startIdx = new AtomicInteger(initNodes);
IgniteInternalFuture startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int nodeIdx = startIdx.incrementAndGet();
if (rnd.nextInt(3) == 0) {
log.info("Start client: " + nodeIdx);
startClientGrid(nodeIdx);
}
else {
log.info("Start server: " + nodeIdx);
startGrid(nodeIdx);
}
if (rnd.nextBoolean()) {
log.info("Stop started node: " + nodeIdx);
stopGrid(nodeIdx);
}
return null;
}
}, 5, "start-node");
stopFut.get();
startFut.get();
checkCaches();
}
finally {
System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeStartRandomClientsServers() throws Exception {
for (int iter = 0; iter < 3; iter++) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
final int srvs = rnd.nextInt(3) + 1;
final int clients = rnd.nextInt(3);
log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", clients=" + clients + ']');
Ignite srv0 = startGrids(srvs);
for (int i = 0; i < clients; i++)
startClientGrid(srvs + i);
final int threads = 8;
final int initNodes = srvs + clients;
mergeExchangeWaitVersion(srv0, initNodes + threads);
final AtomicInteger idx = new AtomicInteger(initNodes);
IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int nodeIdx = idx.incrementAndGet();
if (rnd.nextInt(3) == 0) {
log.info("Start client: " + nodeIdx);
startClientGrid(nodeIdx);
}
else {
log.info("Start server: " + nodeIdx);
startGrid(nodeIdx);
}
return null;
}
}, threads, "test-thread");
fut.get();
checkCaches();
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeStartStopRandomClientsServers() throws Exception {
for (int iter = 0; iter < 3; iter++) {
log.info("Iteration: " + iter);
final int srvs = 5;
final int clients = 5;
Ignite srv0 = startGrids(srvs);
for (int i = 0; i < clients; i++)
startClientGrid(srvs + i);
final int threads = 8;
final int initNodes = srvs + clients;
mergeExchangeWaitVersion(srv0, initNodes + threads);
final AtomicInteger idx = new AtomicInteger(initNodes);
final GridConcurrentHashSet<Integer> stopNodes = new GridConcurrentHashSet<>();
IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
if (rnd.nextBoolean()) {
Integer stopIdx;
for (;;) {
stopIdx = rnd.nextInt(initNodes - 1) + 1;
if (stopNodes.add(stopIdx))
break;
}
log.info("Stop node: " + stopIdx);
stopGrid(getTestIgniteInstanceName(stopIdx), true, false);
}
else {
int nodeIdx = idx.incrementAndGet();
if (rnd.nextInt(5) == 0) {
log.info("Start client: " + nodeIdx);
startClientGrid(nodeIdx);
}
else {
log.info("Start server: " + nodeIdx);
startGrid(nodeIdx);
}
}
return null;
}
}, threads, "test-thread");
fut.get();
checkCaches();
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentStartServers() throws Exception {
concurrentStart(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentStartServersAndClients() throws Exception {
concurrentStart(true);
}
/**
* @param withClients If {@code true} also starts client nodes.
* @throws Exception If failed.
*/
private void concurrentStart(final boolean withClients) throws Exception {
int iterations = GridTestUtils.SF.applyLB(5, 1);
for (int i = 0; i < iterations; i++) {
log.info("Iteration: " + i);
startGrid(0);
final AtomicInteger idx = new AtomicInteger(1);
IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement();
Ignite node;
if (withClients && ThreadLocalRandom.current().nextBoolean())
node = startClientGrid(nodeIdx);
else
node = startGrid(nodeIdx);
checkNodeCaches(node, nodeIdx * 1000, 1000);
return null;
}
}, 10, "start-node");
fut.get();
checkCaches();
startGrid(1000);
checkCaches();
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-10186")
@Test
public void testMergeServerAndClientJoin1() throws Exception {
final IgniteEx srv0 = startGrid(0);
mergeExchangeWaitVersion(srv0, 3);
IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(1);
return null;
}
}, 1, "start-srv");
waitForExchangeStart(srv0, 2);
IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startClientGrid(2);
return null;
}
}, 1, "start-client");
fut1.get();
fut2.get();
checkCaches();
checkExchanges(srv0, 1, 3);
checkExchanges(ignite(1), 3);
checkExchanges(ignite(2), 3);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception {
startCacheOnJoinAndJoinMerge1(2, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception {
startCacheOnJoinAndJoinMerge1(4, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartCacheOnJoinAndJoinMerge_WithClients() throws Exception {
startCacheOnJoinAndJoinMerge1(5, true);
}
/**
* @param nodes Number of nodes to start.
* @param withClients If {@code true} starts both servers and clients.
* @throws Exception If failed.
*/
private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) throws Exception {
cfgCache = false;
final IgniteEx srv0 = startGrid(0);
mergeExchangeWaitVersion(srv0, nodes + 1);
if (withClients) {
clientC = new IgniteClosure<String, Boolean>() {
@Override public Boolean apply(String nodeName) {
return getTestIgniteInstanceIndex(nodeName) % 2 == 0;
}
};
}
cfgCache = true;
IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes);
fut.get();
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeAndHistoryCleanup() throws Exception {
final int histSize = 5;
String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, String.valueOf(histSize));
try {
final Ignite srv0 = startGrid(0);
int topVer = 1;
for (int i = 0; i < 3; i++) {
mergeExchangeWaitVersion(srv0, topVer + 3);
startGridsAsync(srv0, topVer, 3).get();
topVer += 3;
}
checkHistorySize(histSize);
awaitPartitionMapExchange();
checkHistorySize(histSize);
mergeExchangeWaitVersion(srv0, topVer + 2);
stopGrid(1);
stopGrid(2);
checkHistorySize(histSize);
awaitPartitionMapExchange();
checkHistorySize(histSize);
}
finally {
if (oldHistVal != null)
System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal);
else
System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
}
}
/**
* @param histSize History size.
*/
private void checkHistorySize(int histSize) {
List<Ignite> nodes = G.allGrids();
assertTrue(!nodes.isEmpty());
for (Ignite node : nodes) {
List<GridDhtPartitionsExchangeFuture> exchFuts =
((IgniteEx)node).context().cache().context().exchange().exchangeFutures();
assertTrue("Unexpected size: " + exchFuts.size(), !exchFuts.isEmpty() && exchFuts.size() <= histSize);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartCacheOnJoinAndMergeWithFail() throws Exception {
cfgCache = false;
final Ignite srv0 = startGrids(2);
mergeExchangeWaitVersion(srv0, 5);
cfgCache = true;
IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
stopGrid(1);
fut.get();
checkCaches();
checkExchanges(srv0, 1, 2, 3, 5);
checkExchanges(ignite(2), 3, 5);
checkExchanges(ignite(3), 5);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception {
cfgCache = false;
final Ignite srv0 = startGrids(2);
mergeExchangeWaitVersion(srv0, 5);
cfgCache = true;
IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
stopGrid(0);
fut.get();
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception {
cfgCache = false;
final Ignite srv0 = startGrid(0);
mergeExchangeWaitVersion(srv0, 3);
cfgCache = true;
IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2);
stopGrid(0);
fut.get();
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeServersJoin1() throws Exception {
IgniteEx srv0 = startGrid(0);
mergeExchangeWaitVersion(srv0, 3);
final AtomicInteger idx = new AtomicInteger(1);
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(idx.getAndIncrement());
return null;
}
}, 2, "start-node");
fut.get();
checkCaches();
checkExchanges(srv0, 1, 3);
checkExchanges(ignite(1), 3);
checkExchanges(ignite(2), 3);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeServerJoin1ClientsInTopology() throws Exception {
IgniteEx srv0 = startGrid(0);
startClientGrid(1);
startClientGrid(2);
mergeExchangeWaitVersion(srv0, 5);
final AtomicInteger idx = new AtomicInteger(3);
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(idx.getAndIncrement());
return null;
}
}, 2, "start-node");
fut.get();
checkCaches();
checkExchanges(srv0, 1, 2, 3, 5);
checkExchanges(ignite(1), 2, 3, 5);
checkExchanges(ignite(2), 3, 5);
checkExchanges(ignite(3), 5);
checkExchanges(ignite(4), 5);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeAndNewCoordinator() throws Exception {
final Ignite srv0 = startGrids(3);
mergeExchangeWaitVersion(srv0, 6);
IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3);
fut.get();
checkCaches();
stopGrid(0);
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeServersFail1_1() throws Exception {
mergeServersFail1(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeServersFail1_2() throws Exception {
mergeServersFail1(true);
}
/**
* @param waitRebalance Wait for rebalance end before start tested topology change.
* @throws Exception If failed.
*/
private void mergeServersFail1(boolean waitRebalance) throws Exception {
final Ignite srv0 = startGrids(5);
if (waitRebalance)
awaitPartitionMapExchange();
final List<DiscoveryEvent> mergedEvts = new ArrayList<>();
mergeExchangeWaitVersion(srv0, 8, mergedEvts);
UUID grid3Id = grid(3).localNode().id();
UUID grid2Id = grid(2).localNode().id();
stopGrid(getTestIgniteInstanceName(4), true, false);
stopGrid(getTestIgniteInstanceName(3), true, false);
stopGrid(getTestIgniteInstanceName(2), true, false);
checkCaches();
awaitPartitionMapExchange();
assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(), mergedEvts.size() == 2);
for (DiscoveryEvent discoEvt : mergedEvts) {
ClusterNode evtNode = discoEvt.eventNode();
assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null);
assertTrue("Unexpected eventNode ID: "
+ evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id,
evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeServersAndClientsFail1() throws Exception {
mergeServersAndClientsFail(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeServersAndClientsFail2() throws Exception {
mergeServersAndClientsFail(true);
}
/**
* @param waitRebalance Wait for rebalance end before start tested topology change.
* @throws Exception If failed.
*/
private void mergeServersAndClientsFail(boolean waitRebalance) throws Exception {
clientC = new IgniteClosure<String, Boolean>() {
@Override public Boolean apply(String nodeName) {
return nodeName.equals(getTestIgniteInstanceName(2)) || nodeName.equals(getTestIgniteInstanceName(3));
}
};
final Ignite srv0 = startGrids(6);
if (waitRebalance)
awaitPartitionMapExchange();
mergeExchangeWaitVersion(srv0, 10);
stopGrid(getTestIgniteInstanceName(1), true, false);
stopGrid(getTestIgniteInstanceName(2), true, false);
stopGrid(getTestIgniteInstanceName(3), true, false);
stopGrid(getTestIgniteInstanceName(4), true, false);
checkAffinity();
mergeExchangeWaitVersion(srv0, 12);
IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2);
fut.get();
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception {
for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
exchangeCoordinatorChangeNoMerge(4, true, mode);
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception {
for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
exchangeCoordinatorChangeNoMerge(8, true, mode);
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception {
for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
exchangeCoordinatorChangeNoMerge(5, false, mode);
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception {
for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
exchangeCoordinatorChangeNoMerge(8, false, mode);
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws Exception {
for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
mergeJoinExchangesCoordinatorChange1(4, mode);
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws Exception {
for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
mergeJoinExchangesCoordinatorChange1(8, mode);
stopAllGrids();
}
}
/**
* @param srvs Number of server nodes.
* @param mode Test mode.
* @throws Exception If failed.
*/
private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode mode)
throws Exception
{
log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode + ']');
testSpi = true;
Ignite srv0 = startGrids(srvs);
CountDownLatch latch = blockExchangeFinish(srvs, mode);
IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2);
if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
fail("Failed to wait for expected messages.");
stopGrid(getTestIgniteInstanceName(0), true, false);
fut.get();
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws Exception {
mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), F.asList(5));
stopAllGrids();
mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), F.asList(4));
}
/**
* @param srvs Number of server nodes.
* @param startNodes Number of nodes to start.
* @param blockNodes Nodes which do not receive messages.
* @param waitMsgNodes Nodes which should receive messages.
* @throws Exception If failed.
*/
private void mergeJoinExchangeCoordinatorChange2(final int srvs,
final int startNodes,
List<Integer> blockNodes,
List<Integer> waitMsgNodes) throws Exception
{
testSpi = true;
Ignite srv0 = startGrids(srvs);
mergeExchangeWaitVersion(srv0, srvs + startNodes);
CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes);
IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes);
if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
fail("Failed to wait for expected messages.");
stopGrid(getTestIgniteInstanceName(0), true, false);
fut.get();
checkCaches();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMergeExchangeCoordinatorChange4() throws Exception {
testSpi = true;
final int srvs = 4;
Ignite srv0 = startGrids(srvs);
mergeExchangeWaitVersion(srv0, 6);
final AtomicInteger idx = new AtomicInteger(srvs);
CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 4), F.asList(5));
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(idx.getAndIncrement());
return null;
}
}, 2, "start-node");
if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
fail("Failed to wait for expected messages.");
stopGrid(getTestIgniteInstanceName(0), true, false);
fut.get();
checkCaches();
}
/**
* @param srvs Number of servers.
* @param join If {@code true} starts new node, otherwise stops node.
* @param mode Tested scenario.
* @throws Exception If failed.
*/
private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean join, CoordinatorChangeMode mode) throws Exception {
log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", mode=" + mode + ']');
testSpi = true;
final int nodes = srvs;
startGrids(nodes);
CountDownLatch latch = blockExchangeFinish(srvs, mode);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
if (join)
startGrid(nodes);
else
stopGrid(nodes - 1);
return null;
}
});
waitForExchangeStart(ignite(0), nodes + 1);
if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
fail("Failed to wait for expected messages.");
stopGrid(0);
fut.get();
checkCaches();
}
/**
* @param srvs Number of server nodes.
* @param mode Test scenario.
* @return Awaited state latch.
* @throws Exception If failed.
*/
private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode mode) throws Exception {
Ignite crd = ignite(0);
long topVer = srvs + 1;
switch (mode) {
case NOBODY_RCVD: {
blockExchangeFinish(crd, topVer);
break;
}
case NEW_CRD_RCDV: {
List<Integer> finishNodes = F.asList(1);
return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes);
}
case NON_CRD_RCVD: {
assert srvs > 2 : srvs;
List<Integer> finishNodes = F.asList(2);
return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes);
}
default:
fail();
}
return null;
}
/**
* @param srvs Number of servers.
* @param waitNodes Nodes which should receive message.
* @return Blocked nodes indexes.
*/
private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) {
List<Integer> block = new ArrayList<>();
for (int i = 0; i < srvs + 1; i++) {
if (!waitNodes.contains(i))
block.add(i);
}
return block;
}
/**
* @param crd Exchange coordinator.
* @param topVer Exchange topology version.
*/
private void blockExchangeFinish(Ignite crd, long topVer) {
final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
if (msg instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg;
return msg0.exchangeId() != null && msg0.exchangeId().topologyVersion().equals(topVer0);
}
return false;
}
});
}
/**
* @param crd Exchange coordinator.
* @param topVer Exchange topology version.
* @param blockNodes Nodes which do not receive messages.
* @param waitMsgNodes Nodes which should receive messages.
* @return Awaited state latch.
*/
private CountDownLatch blockExchangeFinish(Ignite crd,
long topVer,
final List<Integer> blockNodes,
final List<Integer> waitMsgNodes)
{
log.info("blockExchangeFinish [crd=" + crd.cluster().localNode().id() +
", block=" + blockNodes +
", wait=" + waitMsgNodes + ']');
final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
final CountDownLatch latch = new CountDownLatch(waitMsgNodes.size());
TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
if (msg instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg;
if (msg0.exchangeId() == null || msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0)
return false;
String name = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
assert name != null : node;
for (Integer idx : blockNodes) {
if (name.equals(getTestIgniteInstanceName(idx)))
return true;
}
for (Integer idx : waitMsgNodes) {
if (name.equals(getTestIgniteInstanceName(idx))) {
log.info("Coordinators sends awaited message [node=" + node.id() + ']');
latch.countDown();
}
}
}
return false;
}
});
return latch;
}
/**
* @throws Exception If failed.
*/
private void checkCaches() throws Exception {
checkAffinity();
checkCaches0();
checkAffinity();
awaitPartitionMapExchange();
checkTopologiesConsistency();
checkCaches0();
}
/**
* @throws Exception If failed.
*/
private void checkCaches0() throws Exception {
List<Ignite> nodes = G.allGrids();
assertTrue(!nodes.isEmpty());
for (Ignite node : nodes)
checkNodeCaches(node);
}
/**
* Checks that after exchange all nodes have consistent state about partition owners.
*
* @throws Exception If failed.
*/
private void checkTopologiesConsistency() throws Exception {
List<Ignite> nodes = G.allGrids();
IgniteEx crdNode = null;
for (Ignite node : nodes) {
ClusterNode locNode = node.cluster().localNode();
if (crdNode == null || locNode.order() < crdNode.localNode().order())
crdNode = (IgniteEx) node;
}
for (Ignite node : nodes) {
IgniteEx node0 = (IgniteEx) node;
if (node0.localNode().id().equals(crdNode.localNode().id()))
continue;
for (IgniteInternalCache cache : node0.context().cache().caches()) {
int partitions = cache.context().affinity().partitions();
for (int p = 0; p < partitions; p++) {
List<ClusterNode> crdOwners = crdNode.cachex(cache.name()).cache().context().topology().owners(p);
List<ClusterNode> owners = cache.context().topology().owners(p);
assertEquals(crdOwners, owners);
}
}
}
}
/**
* @throws Exception If failed.
*/
private void checkAffinity() throws Exception {
List<Ignite> nodes = G.allGrids();
ClusterNode crdNode = null;
for (Ignite node : nodes) {
ClusterNode locNode = node.cluster().localNode();
if (crdNode == null || locNode.order() < crdNode.order())
crdNode = locNode;
}
AffinityTopologyVersion topVer = ((IgniteKernal)grid(crdNode)).
context().cache().context().exchange().readyAffinityVersion();
Map<String, List<List<ClusterNode>>> affMap = new HashMap<>();
for (Ignite node : nodes) {
IgniteKernal node0 = (IgniteKernal)node;
for (IgniteInternalCache cache : node0.context().cache().caches()) {
List<List<ClusterNode>> aff = affMap.get(cache.name());
List<List<ClusterNode>> aff0 = cache.context().affinity().assignments(topVer);
if (aff != null)
assertEquals(aff, aff0);
else
affMap.put(cache.name(), aff0);
}
}
}
/**
* @param node Node.
* @param startKey Start key.
* @param keyRange Keys range.
*/
private void checkNodeCaches(Ignite node, int startKey, int keyRange) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (String cacheName : cacheNames) {
String err = "Invalid value [node=" + node.name() +
", client=" + node.configuration().isClientMode() +
", order=" + node.cluster().localNode().order() +
", cache=" + cacheName + ']';
IgniteCache<Object, Object> cache = node.cache(cacheName);
for (int i = 0; i < 10; i++) {
Integer key = rnd.nextInt(keyRange) + startKey;
cache.put(key, i);
Object val = cache.get(key);
assertEquals(err, i, val);
}
}
}
/**
* @param node Node.
* @throws Exception If failed.
*/
private void checkNodeCaches(final Ignite node) throws Exception {
log.info("Check node caches [node=" + node.name() + ']');
List<Future<?>> futs = new ArrayList<>();
for (final String cacheName : cacheNames) {
final IgniteCache<Object, Object> cache = node.cache(cacheName);
futs.add(executor.submit(new Runnable() {
@Override public void run() {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
assertNotNull("No cache [node=" + node.name() +
", client=" + node.configuration().isClientMode() +
", order=" + node.cluster().localNode().order() +
", cache=" + cacheName + ']', cache);
String err = "Invalid value [node=" + node.name() +
", client=" + node.configuration().isClientMode() +
", order=" + node.cluster().localNode().order() +
", cache=" + cacheName + ']';
for (int i = 0; i < 5; i++) {
Integer key = rnd.nextInt(20_000);
cache.put(key, i);
Object val = cache.get(key);
assertEquals(err, i, val);
}
for (int i = 0; i < 5; i++) {
Map<Integer, Integer> map = new TreeMap<>();
for (int j = 0; j < 10; j++) {
Integer key = rnd.nextInt(20_000);
map.put(key, i);
}
cache.putAll(map);
Map<Object, Object> res = cache.getAll(map.keySet());
for (Map.Entry<Integer, Integer> e : map.entrySet())
assertEquals(err, e.getValue(), res.get(e.getKey()));
}
if (atomicityMode(cache) == TRANSACTIONAL) {
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values())
checkNodeCaches(err, node, cache, concurrency, isolation);
}
}
}
}));
}
for (Future<?> fut : futs)
fut.get();
}
/**
* @param err Error message.
* @param node Node.
* @param cache Cache.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
*/
private void checkNodeCaches(
String err,
Ignite node,
IgniteCache<Object, Object> cache,
TransactionConcurrency concurrency,
TransactionIsolation isolation) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
Map<Object, Object> map = new HashMap<>();
try {
try (Transaction tx = node.transactions().txStart(concurrency, isolation)) {
for (int i = 0; i < 5; i++) {
Integer key = rnd.nextInt(20_000);
cache.put(key, i);
Object val = cache.get(key);
assertEquals(i, val);
map.put(key, val);
}
tx.commit();
}
}
catch (ClusterTopologyException e) {
info("Tx failed, ignore: " + e);
return;
}
for (Map.Entry<Object, Object> e : map.entrySet())
assertEquals(err, e.getValue(), cache.get(e.getKey()));
}
/**
* @param node Node.
* @param vers Expected exchange versions.
*/
private void checkExchanges(Ignite node, long... vers) {
IgniteKernal node0 = (IgniteKernal)node;
List<AffinityTopologyVersion> expVers = new ArrayList<>();
for (long ver : vers)
expVers.add(new AffinityTopologyVersion(ver));
List<AffinityTopologyVersion> doneVers = new ArrayList<>();
List<GridDhtPartitionsExchangeFuture> futs =
node0.context().cache().context().exchange().exchangeFutures();
for (int i = futs.size() - 1; i >= 0; i--) {
GridDhtPartitionsExchangeFuture fut = futs.get(i);
if (!fut.isMerged() && fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) {
AffinityTopologyVersion resVer = fut.topologyVersion();
Assert.assertNotNull(resVer);
doneVers.add(resVer);
}
}
assertEquals(expVers, doneVers);
for (CacheGroupContext grpCtx : node0.context().cache().cacheGroups()) {
for (AffinityTopologyVersion ver : grpCtx.affinity().cachedVersions()) {
if (ver.minorTopologyVersion() > 0)
continue;
assertTrue("Unexpected version [ver=" + ver + ", exp=" + expVers + ']',
expVers.contains(ver));
}
}
}
/**
* @param node Node.
* @param topVer Exchange version.
* @throws Exception If failed.
*/
private void waitForExchangeStart(final Ignite node, final long topVer) throws Exception {
final GridCachePartitionExchangeManager exch = ((IgniteKernal)node).context().cache().context().exchange();
boolean wait = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return exch.lastTopologyFuture().initialVersion().topologyVersion() >= topVer;
}
}, 15_000);
assertTrue(wait);
}
/**
* Sequentially starts nodes so that node name is consistent with node order.
*
* @param node Some existing node.
* @param startIdx Start node index.
* @param cnt Number of nodes.
* @return Start future.
* @throws Exception If failed.
*/
private IgniteInternalFuture startGridsAsync(Ignite node, int startIdx, int cnt) throws Exception {
GridCompoundFuture fut = new GridCompoundFuture();
for (int i = 0; i < cnt; i++) {
final CountDownLatch latch = new CountDownLatch(1);
node.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
log.info("Got event: " + ((DiscoveryEvent)evt).eventNode().id());
latch.countDown();
return false;
}
}, EventType.EVT_NODE_JOINED);
final int nodeIdx = startIdx + i;
IgniteInternalFuture fut0 = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
log.info("Start new node: " + nodeIdx);
startGrid(nodeIdx);
return null;
}
}, "start-node-" + nodeIdx);
if (!latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
fail();
fut.add(fut0);
}
fut.markInitialized();
return fut;
}
/**
*
*/
enum CoordinatorChangeMode {
/**
* Coordinator failed, did not send full message.
*/
NOBODY_RCVD,
/**
* Coordinator failed, but new coordinator received full message
* and finished exchange.
*/
NEW_CRD_RCDV,
/**
* Coordinator failed, but one of servers (not new coordinator) received full message.
*/
NON_CRD_RCVD
}
/**
*
*/
static class TestDelayExchangeMessagesSpi extends TestDelayingCommunicationSpi {
/** {@inheritDoc} */
@Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) {
if (msg instanceof GridDhtPartitionsAbstractMessage)
return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null || (msg instanceof GridDhtPartitionsSingleRequest);
return false;
}
}
}