blob: 71718c9d16cc4e2cc3b45131cffe6c865416ff08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
static final String CACHE_NAME_PREFIX = "cache-";
/** Non-persistent data region name. */
private static final String NO_PERSISTENCE_REGION = "no-persistence-region";
/** */
boolean client;
/** */
private boolean active = true;
/** */
CacheConfiguration[] ccfgs;
/** */
private boolean testSpi;
/** */
private boolean testDiscoSpi;
/** */
private boolean testReconnectSpi;
/** */
private Class[] testSpiRecord;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (testReconnectSpi) {
TcpDiscoverySpi spi = new IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi();
cfg.setDiscoverySpi(spi);
spi.setJoinTimeout(2 * 60_000);
}
else if (testDiscoSpi)
cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
cfg.setConsistentId(igniteInstanceName);
cfg.setClientMode(client);
cfg.setActiveOnStart(active);
if (ccfgs != null) {
cfg.setCacheConfiguration(ccfgs);
ccfgs = null;
}
DataStorageConfiguration memCfg = new DataStorageConfiguration();
memCfg.setPageSize(4 * 1024);
memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(300 * 1024 * 1024)
.setPersistenceEnabled(persistenceEnabled()));
memCfg.setDataRegionConfigurations(new DataRegionConfiguration()
.setMaxSize(300 * 1024 * 1024)
.setName(NO_PERSISTENCE_REGION)
.setPersistenceEnabled(false));
if (persistenceEnabled())
memCfg.setWalMode(WALMode.LOG_ONLY);
cfg.setDataStorageConfiguration(memCfg);
if (testSpi) {
TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
if (testSpiRecord != null)
spi.record(testSpiRecord);
cfg.setCommunicationSpi(spi);
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* @return {@code True} if test with persistence.
*/
protected boolean persistenceEnabled() {
return false;
}
/**
* @throws Exception If failed.
*/
public void testActivateSimple_SingleNode() throws Exception {
activateSimple(1, 0, 0);
}
/**
* @throws Exception If failed.
*/
public void testActivateSimple_5_Servers() throws Exception {
activateSimple(5, 0, 0);
}
/**
* @throws Exception If failed.
*/
public void testActivateSimple_5_Servers2() throws Exception {
activateSimple(5, 0, 4);
}
/**
* @throws Exception If failed.
*/
public void testActivateSimple_5_Servers_5_Clients() throws Exception {
activateSimple(5, 4, 0);
}
/**
* @throws Exception If failed.
*/
public void testActivateSimple_5_Servers_5_Clients_FromClient() throws Exception {
activateSimple(5, 4, 6);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param activateFrom Index of node stating activation.
* @throws Exception If failed.
*/
private void activateSimple(int srvs, int clients, int activateFrom) throws Exception {
active = false;
final int CACHES = 2;
for (int i = 0; i < srvs + clients; i++) {
client = i >= srvs;
ccfgs = cacheConfigurations1();
startGrid(i);
checkNoCaches(i);
}
for (int i = 0; i < srvs + clients; i++)
assertFalse(ignite(i).active());
ignite(activateFrom).active(false); // Should be no-op.
ignite(activateFrom).active(true);
for (int i = 0; i < srvs + clients; i++)
assertTrue(ignite(i).active());
for (int i = 0; i < srvs + clients; i++) {
for (int c = 0; c < 2; c++)
checkCache(ignite(i), CACHE_NAME_PREFIX + c, true);
checkCache(ignite(i), CU.UTILITY_CACHE_NAME, true);
}
checkCaches(srvs + clients, CACHES);
client = false;
startGrid(srvs + clients);
for (int c = 0; c < 2; c++)
checkCache(ignite(srvs + clients), CACHE_NAME_PREFIX + c, true);
checkCaches(srvs + clients + 1, CACHES);
client = true;
startGrid(srvs + clients + 1);
for (int c = 0; c < 2; c++)
checkCache(ignite(srvs + clients + 1), CACHE_NAME_PREFIX + c, false);
checkCaches(srvs + clients + 2, CACHES);
}
/**
* @param nodes Number of nodes.
* @param caches Number of caches.
*/
final void checkCaches(int nodes, int caches) {
for (int i = 0; i < nodes; i++) {
for (int c = 0; c < caches; c++) {
IgniteCache<Integer, Integer> cache = ignite(i).cache(CACHE_NAME_PREFIX + c);
for (int j = 0; j < 10; j++) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
Integer key = rnd.nextInt(1000);
cache.put(key, j);
assertEquals((Integer)j, cache.get(key));
}
}
}
}
/**
* @throws Exception If failed.
*/
public void testJoinWhileActivate1_Server() throws Exception {
joinWhileActivate1(false, false);
}
/**
* @throws Exception If failed.
*/
public void testJoinWhileActivate1_WithCache_Server() throws Exception {
joinWhileActivate1(false, true);
}
/**
* @throws Exception If failed.
*/
public void testJoinWhileActivate1_Client() throws Exception {
joinWhileActivate1(true, false);
}
/**
* @param startClient If {@code true} joins client node, otherwise server.
* @param withNewCache If {@code true} joining node has new cache in configuration.
* @throws Exception If failed.
*/
private void joinWhileActivate1(final boolean startClient, final boolean withNewCache) throws Exception {
IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, false);
IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
client = startClient;
ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
startGrid(2);
return null;
}
});
TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1));
spi1.stopBlock();
activeFut.get();
startFut.get();
for (int c = 0; c < 2; c++)
checkCache(ignite(2), CACHE_NAME_PREFIX + c, true);
if (withNewCache) {
for (int i = 0; i < 3; i++) {
for (int c = 0; c < 4; c++)
checkCache(ignite(i), CACHE_NAME_PREFIX + c, true);
}
}
awaitPartitionMapExchange();
checkCaches(3, withNewCache ? 4 : 2);
client = false;
startGrid(3);
checkCaches(4, withNewCache ? 4 : 2);
client = true;
startGrid(4);
checkCaches(5, withNewCache ? 4 : 2);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param stateChangeFrom Index of node initiating changes.
* @param initiallyActive If {@code true} start cluster in active state (otherwise in inactive).
* @param blockMsgNodes Nodes whcis block exchange messages.
* @return State change future.
* @throws Exception If failed.
*/
private IgniteInternalFuture<?> startNodesAndBlockStatusChange(int srvs,
int clients,
final int stateChangeFrom,
final boolean initiallyActive,
int... blockMsgNodes) throws Exception {
active = initiallyActive;
testSpi = true;
startWithCaches1(srvs, clients);
int minorVer = 1;
if (initiallyActive && persistenceEnabled()) {
ignite(0).active(true);
minorVer++;
}
if (blockMsgNodes.length == 0)
blockMsgNodes = new int[]{1};
final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(srvs + clients, minorVer);
List<TestRecordingCommunicationSpi> spis = new ArrayList<>();
for (int idx : blockMsgNodes) {
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ignite(idx));
spis.add(spi);
blockExchangeSingleMessage(spi, STATE_CHANGE_TOP_VER);
}
IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
ignite(stateChangeFrom).active(!initiallyActive);
}
});
for (TestRecordingCommunicationSpi spi : spis)
spi.waitForBlocked();
U.sleep(500);
assertFalse(stateChangeFut.isDone());
return stateChangeFut;
}
/**
* @param spi SPI.
* @param topVer Exchange topology version.
*/
private void blockExchangeSingleMessage(TestRecordingCommunicationSpi spi, final AffinityTopologyVersion topVer) {
spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode clusterNode, Message msg) {
if (msg instanceof GridDhtPartitionsSingleMessage) {
GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg;
if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer))
return true;
}
return false;
}
});
}
/**
* @throws Exception If failed.
*/
public void testJoinWhileDeactivate1_Server() throws Exception {
joinWhileDeactivate1(false, false);
}
/**
* @throws Exception If failed.
*/
public void testJoinWhileDeactivate1_WithCache_Server() throws Exception {
joinWhileDeactivate1(false, true);
}
/**
* @throws Exception If failed.
*/
public void testJoinWhileDeactivate1_Client() throws Exception {
joinWhileDeactivate1(true, false);
}
/**
* @param startClient If {@code true} joins client node, otherwise server.
* @param withNewCache If {@code true} joining node has new cache in configuration.
* @throws Exception If failed.
*/
private void joinWhileDeactivate1(final boolean startClient, final boolean withNewCache) throws Exception {
IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, true);
IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
client = startClient;
ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
startGrid(2);
return null;
}
});
TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1));
spi1.stopBlock();
activeFut.get();
startFut.get();
checkNoCaches(3);
ignite(2).active(true);
for (int c = 0; c < 2; c++)
checkCache(ignite(2), CACHE_NAME_PREFIX + c, true);
if (withNewCache) {
for (int i = 0; i < 3; i++) {
for (int c = 0; c < 4; c++)
checkCache(ignite(i), CACHE_NAME_PREFIX + c, true);
}
}
awaitPartitionMapExchange();
checkCaches(3, withNewCache ? 4 : 2);
client = false;
startGrid(3);
checkCaches(4, withNewCache ? 4 : 2);
client = true;
startGrid(4);
checkCaches(5, withNewCache ? 4 : 2);
}
/**
* @throws Exception If failed.
*/
public void testConcurrentJoinAndActivate() throws Exception {
for (int iter = 0; iter < 3; iter++) {
log.info("Iteration: " + iter);
active = false;
for (int i = 0; i < 3; i++) {
ccfgs = cacheConfigurations1();
startGrid(i);
}
final int START_NODES = 3;
final CyclicBarrier b = new CyclicBarrier(START_NODES + 1);
IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
b.await();
Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
ignite(0).active(true);
return null;
}
});
final AtomicInteger nodeIdx = new AtomicInteger(3);
IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int idx = nodeIdx.getAndIncrement();
b.await();
startGrid(idx);
return null;
}
}, START_NODES, "start-node");
fut1.get();
fut2.get();
checkCaches(6, 2);
afterTest();
}
}
/**
* @throws Exception If failed.
*/
public void testDeactivateSimple_SingleNode() throws Exception {
deactivateSimple(1, 0, 0);
}
/**
* @throws Exception If failed.
*/
public void testDeactivateSimple_5_Servers() throws Exception {
deactivateSimple(5, 0, 0);
}
/**
* @throws Exception If failed.
*/
public void testDeactivateSimple_5_Servers2() throws Exception {
deactivateSimple(5, 0, 4);
}
/**
* @throws Exception If failed.
*/
public void testDeactivateSimple_5_Servers_5_Clients() throws Exception {
deactivateSimple(5, 4, 0);
}
/**
* @throws Exception If failed.
*/
public void testDeactivateSimple_5_Servers_5_Clients_FromClient() throws Exception {
deactivateSimple(5, 4, 6);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param deactivateFrom Index of node stating deactivation.
* @throws Exception If failed.
*/
private void deactivateSimple(int srvs, int clients, int deactivateFrom) throws Exception {
active = true;
final int CACHES = 2;
for (int i = 0; i < srvs + clients; i++) {
client = i >= srvs;
ccfgs = cacheConfigurations1();
startGrid(i);
}
if (persistenceEnabled())
ignite(deactivateFrom).active(true);
ignite(deactivateFrom).active(true); // Should be no-op.
checkCaches(srvs + clients, CACHES);
for (int i = 0; i < srvs + clients; i++)
assertTrue(ignite(i).active());
ignite(deactivateFrom).active(false);
for (int i = 0; i < srvs + clients; i++)
assertFalse(ignite(i).active());
checkNoCaches(srvs + clients);
client = false;
startGrid(srvs + clients);
checkNoCaches(srvs + clients + 1);
client = true;
startGrid(srvs + clients + 1);
checkNoCaches(srvs + clients + 2);
for (int i = 0; i < srvs + clients + 2; i++)
assertFalse(ignite(i).active());
ignite(deactivateFrom).active(true);
for (int i = 0; i < srvs + clients + 2; i++) {
assertTrue(ignite(i).active());
checkCache(ignite(i), CU.UTILITY_CACHE_NAME, true);
}
for (int i = 0; i < srvs; i++) {
for (int c = 0; c < 2; c++)
checkCache(ignite(i), CACHE_NAME_PREFIX + c, true);
}
checkCaches1(srvs + clients + 2);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @throws Exception If failed.
*/
private void startWithCaches1(int srvs, int clients) throws Exception {
for (int i = 0; i < srvs + clients; i++) {
ccfgs = cacheConfigurations1();
client = i >= srvs;
startGrid(i);
}
}
/**
* @throws Exception If failed.
*/
public void testClientReconnectClusterActive() throws Exception {
testReconnectSpi = true;
ccfgs = cacheConfigurations1();
final int SRVS = 3;
final int CLIENTS = 3;
startWithCaches1(SRVS, CLIENTS);
if (persistenceEnabled())
ignite(0).active(true);
Ignite srv = ignite(0);
Ignite client = ignite(SRVS);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches1(SRVS + CLIENTS);
IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, null);
checkCaches1(SRVS + CLIENTS);
this.client = false;
startGrid(SRVS + CLIENTS);
this.client = true;
startGrid(SRVS + CLIENTS + 1);
checkCaches1(SRVS + CLIENTS + 2);
}
/**
* @throws Exception If failed.
*/
public void testClientReconnectClusterInactive() throws Exception {
testReconnectSpi = true;
active = false;
final int SRVS = 3;
final int CLIENTS = 3;
startWithCaches1(SRVS, CLIENTS);
Ignite srv = ignite(0);
Ignite client = ignite(SRVS);
checkNoCaches(SRVS + CLIENTS);
IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, null);
checkNoCaches(SRVS + CLIENTS);
ignite(0).active(true);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches1(SRVS + CLIENTS);
this.client = false;
startGrid(SRVS + CLIENTS);
this.client = true;
startGrid(SRVS + CLIENTS + 1);
checkCaches1(SRVS + CLIENTS);
}
/**
* @throws Exception If failed.
*/
public void testClientReconnectClusterDeactivated() throws Exception {
clientReconnectClusterDeactivated(false);
}
/**
* @throws Exception If failed.
*/
public void testClientReconnectClusterDeactivateInProgress() throws Exception {
clientReconnectClusterDeactivated(true);
}
/**
* @param transition If {@code true} client reconnects while cluster state transition is in progress.
* @throws Exception If failed.
*/
private void clientReconnectClusterDeactivated(final boolean transition) throws Exception {
testReconnectSpi = true;
testSpi = transition;
final int SRVS = 3;
final int CLIENTS = 3;
startWithCaches1(SRVS, CLIENTS);
final Ignite srv = ignite(0);
IgniteEx client = grid(SRVS);
if (persistenceEnabled())
ignite(0).active(true);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches1(SRVS + CLIENTS);
final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(SRVS + CLIENTS + 1, 1);
final TestRecordingCommunicationSpi spi1 = transition ? TestRecordingCommunicationSpi.spi(ignite(1)) : null;
final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() {
@Override public void run() {
if (transition) {
blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
stateFut.set(GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
srv.active(false);
}
}, "deactivate"));
try {
U.sleep(500);
}
catch (Exception e) {
e.printStackTrace();
}
}
else
srv.active(false);
}
});
if (transition) {
assertFalse(stateFut.get().isDone());
// Public API method would block forever because we blocked the exchange message.
assertFalse(client.context().state().publicApiActiveState(false));
spi1.waitForBlocked();
spi1.stopBlock();
stateFut.get().get();
}
checkNoCaches(SRVS + CLIENTS);
ignite(0).active(true);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
assertTrue(client.active());
checkCaches1(SRVS + CLIENTS);
checkCache(client, CACHE_NAME_PREFIX + 0, true);
this.client = false;
startGrid(SRVS + CLIENTS);
this.client = true;
startGrid(SRVS + CLIENTS + 1);
checkCaches1(SRVS + CLIENTS + 2);
}
/**
* @throws Exception If failed.
*/
public void testClientReconnectClusterActivated() throws Exception {
clientReconnectClusterActivated(false);
}
/**
* @throws Exception If failed.
*/
public void testClientReconnectClusterActivateInProgress() throws Exception {
clientReconnectClusterActivated(true);
}
/**
* @param transition If {@code true} client reconnects while cluster state transition is in progress.
* @throws Exception If failed.
*/
private void clientReconnectClusterActivated(final boolean transition) throws Exception {
testReconnectSpi = true;
testSpi = transition;
active = false;
final int SRVS = 3;
final int CLIENTS = 3;
startWithCaches1(SRVS, CLIENTS);
final Ignite srv = ignite(0);
IgniteEx client = grid(SRVS);
checkNoCaches(SRVS + CLIENTS);
final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(SRVS + CLIENTS + 1, 1);
final TestRecordingCommunicationSpi spi1 = transition ? TestRecordingCommunicationSpi.spi(ignite(1)) : null;
final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() {
@Override public void run() {
if (transition) {
blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
stateFut.set(GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
srv.active(true);
}
}, "activate"));
try {
U.sleep(500);
}
catch (Exception e) {
e.printStackTrace();
}
}
else
srv.active(true);
}
});
if (transition) {
assertFalse(stateFut.get().isDone());
assertTrue(client.context().state().clusterState().transition());
spi1.waitForBlocked();
spi1.stopBlock();
stateFut.get().get();
}
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches1(SRVS + CLIENTS);
checkCache(client, CACHE_NAME_PREFIX + 0, true);
this.client = false;
startGrid(SRVS + CLIENTS);
this.client = true;
startGrid(SRVS + CLIENTS + 1);
checkCaches1(SRVS + CLIENTS + 2);
}
/**
* @throws Exception If failed.
*/
public void testInactiveTopologyChanges() throws Exception {
testSpi = true;
testSpiRecord = new Class[]{GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class};
active = false;
final int SRVS = 4;
final int CLIENTS = 4;
startWithCaches1(SRVS, CLIENTS);
checkRecordedMessages(false);
for (int i = 0; i < 2; i++) {
stopGrid(i);
client = false;
startGrid(i);
}
checkRecordedMessages(false);
for (int i = 0; i < 2; i++) {
stopGrid(SRVS + i);
client = true;
startGrid(SRVS + i);
}
checkRecordedMessages(false);
ignite(0).active(true);
checkCaches1(SRVS + CLIENTS);
checkRecordedMessages(true);
client = false;
startGrid(SRVS + CLIENTS);
client = true;
startGrid(SRVS + CLIENTS + 1);
checkRecordedMessages(true);
checkCaches1(SRVS + CLIENTS + 2);
}
/**
* @throws Exception If failed.
*/
public void testActivateFailover1() throws Exception {
stateChangeFailover1(true);
}
/**
* @throws Exception If failed.
*/
public void testDeactivateFailover1() throws Exception {
stateChangeFailover1(false);
}
/**
* @param activate If {@code true} tests activation, otherwise deactivation.
* @throws Exception If failed.
*/
private void stateChangeFailover1(boolean activate) throws Exception {
// Nodes 1 and 4 do not reply to coordinator.
IgniteInternalFuture<?> fut = startNodesAndBlockStatusChange(4, 4, 3, !activate, 1, 4);
client = false;
// Start one more node while transition is in progress.
IgniteInternalFuture startFut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
startGrid(8);
return null;
}
}, "start-node");
U.sleep(500);
stopGrid(getTestIgniteInstanceName(1), true, false);
stopGrid(getTestIgniteInstanceName(4), true, false);
fut.get();
startFut.get();
client = false;
startGrid(1);
client = true;
startGrid(4);
if (!activate) {
checkNoCaches(9);
ignite(0).active(true);
}
checkCaches1(9);
}
/**
* @throws Exception If failed.
*/
public void testActivateFailover2() throws Exception {
stateChangeFailover2(true);
}
/**
* @throws Exception If failed.
*/
public void testDeactivateFailover2() throws Exception {
stateChangeFailover2(false);
}
/**
* @param activate If {@code true} tests activation, otherwise deactivation.
* @throws Exception If failed.
*/
private void stateChangeFailover2(boolean activate) throws Exception {
// Nodes 1 and 4 do not reply to coordinator.
IgniteInternalFuture<?> fut = startNodesAndBlockStatusChange(4, 4, 3, !activate, 1, 4);
client = false;
// Start more nodes while transition is in progress.
IgniteInternalFuture startFut1 = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
startGrid(8);
return null;
}
}, "start-node1");
IgniteInternalFuture startFut2 = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
startGrid(9);
return null;
}
}, "start-node2");
U.sleep(500);
// Stop coordinator.
stopGrid(getTestIgniteInstanceName(0), true, false);
stopGrid(getTestIgniteInstanceName(1), true, false);
stopGrid(getTestIgniteInstanceName(4), true, false);
fut.get();
startFut1.get();
startFut2.get();
client = false;
startGrid(0);
startGrid(1);
client = true;
startGrid(4);
if (!activate) {
checkNoCaches(10);
ignite(0).active(true);
}
checkCaches1(10);
}
/**
* @param exp If {@code true} there should be recorded messages.
*/
private void checkRecordedMessages(boolean exp) {
for (Ignite node : G.allGrids()) {
List<Object> recorded =
TestRecordingCommunicationSpi.spi(node).recordedMessages(false);
if (exp)
assertFalse(F.isEmpty(recorded));
else
assertTrue(F.isEmpty(recorded));
}
}
/**
* @param nodes Expected nodes number.
*/
private void checkCaches1(int nodes) {
checkCaches(nodes, 2);
}
/**
* @return Cache configurations.
*/
final CacheConfiguration[] cacheConfigurations1() {
CacheConfiguration[] ccfgs = new CacheConfiguration[2];
ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC);
ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
return ccfgs;
}
/**
* @return Cache configurations.
*/
final CacheConfiguration[] cacheConfigurations2() {
CacheConfiguration[] ccfgs = new CacheConfiguration[5];
ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC);
ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
ccfgs[2] = cacheConfiguration(CACHE_NAME_PREFIX + 2, ATOMIC);
ccfgs[3] = cacheConfiguration(CACHE_NAME_PREFIX + 3, TRANSACTIONAL);
ccfgs[4] = cacheConfiguration(CACHE_NAME_PREFIX + 4, TRANSACTIONAL);
ccfgs[4].setDataRegionName(NO_PERSISTENCE_REGION);
return ccfgs;
}
/**
* @param name Cache name.
* @param atomicityMode Atomicity mode.
* @return Cache configuration.
*/
protected final CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration(name);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setBackups(1);
return ccfg;
}
/**
* @param cacheName Cache name.
* @param node Node.
* @param exp {@code True} if expect that cache is started on node.
*/
void checkCache(Ignite node, String cacheName, boolean exp) throws IgniteCheckedException {
((IgniteEx)node).context().cache().context().exchange().lastTopologyFuture().get();
((IgniteEx)node).context().state().publicApiActiveState(true);
GridCacheAdapter cache = ((IgniteKernal)node).context().cache().internalCache(cacheName);
if (exp)
assertNotNull("Cache not found [cache=" + cacheName + ", node=" + node.name() + ']', cache);
else
assertNull("Unexpected cache found [cache=" + cacheName + ", node=" + node.name() + ']', cache);
}
/**
* @param nodes Number of nodes.
*/
final void checkNoCaches(int nodes) {
for (int i = 0; i < nodes; i++) {
grid(i).context().state().publicApiActiveState(true);
GridCacheProcessor cache = ((IgniteKernal)ignite(i)).context().cache();
assertTrue(cache.caches().isEmpty());
assertTrue(cache.internalCaches().isEmpty());
}
}
}