blob: b05d3e9e8a6e73b3a70f0bfe67433373a28b2afc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.JMException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.mxbean.IgniteMXBean;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor.DATA_LOST_ON_DEACTIVATION_WARNING;
import static org.apache.ignite.testframework.GridTestUtils.assertActive;
import static org.apache.ignite.testframework.GridTestUtils.assertInactive;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
*/
public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest {
/** */
static final String CACHE_NAME_PREFIX = "cache-";
/** Non-persistent data region name. */
protected static final String NO_PERSISTENCE_REGION = "no-persistence-region";
/** */
private static final int DEFAULT_CACHES_COUNT = 2;
/** */
private ClusterState stateOnStart;
/** */
CacheConfiguration[] ccfgs;
/** */
private boolean testSpi;
/** */
private boolean testReconnectSpi;
/** */
private Class[] testSpiRecord;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (testReconnectSpi) {
TcpDiscoverySpi spi = new IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi();
cfg.setDiscoverySpi(spi);
spi.setJoinTimeout(2 * 60_000);
}
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(sharedStaticIpFinder);
cfg.setConsistentId(igniteInstanceName);
if (stateOnStart != null)
cfg.setClusterStateOnStart(stateOnStart);
if (ccfgs != null) {
cfg.setCacheConfiguration(ccfgs);
ccfgs = null;
}
DataStorageConfiguration memCfg = new DataStorageConfiguration();
memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(150L * 1024 * 1024)
.setPersistenceEnabled(persistenceEnabled()));
memCfg.setWalSegments(2);
memCfg.setWalSegmentSize(512 * 1024);
memCfg.setDataRegionConfigurations(new DataRegionConfiguration()
.setMaxSize(150L * 1024 * 1024)
.setName(NO_PERSISTENCE_REGION)
.setPersistenceEnabled(false));
if (persistenceEnabled())
memCfg.setWalMode(WALMode.LOG_ONLY);
cfg.setDataStorageConfiguration(memCfg);
cfg.setFailureDetectionTimeout(60_000);
if (testSpi) {
TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
if (testSpiRecord != null)
spi.record(testSpiRecord);
cfg.setCommunicationSpi(spi);
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* @return {@code True} if test with persistence.
*/
protected boolean persistenceEnabled() {
return false;
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFromActivateSimple_SingleNode() throws Exception {
changeActiveClusterStateSimple(1, 0, 0, ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFromActivateSimple_5_Servers() throws Exception {
changeActiveClusterStateSimple(5, 0, 0, ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFromActivateSimple_5_Servers2() throws Exception {
changeActiveClusterStateSimple(5, 0, 4, ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFromActivateSimple_5_Servers_5_Clients() throws Exception {
changeActiveClusterStateSimple(5, 4, 0, ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFromActivateSimple_5_Servers_5_Clients_FromClient() throws Exception {
changeActiveClusterStateSimple(5, 4, 6, ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFromActivateSimple_SingleNode() throws Exception {
changeActiveClusterStateSimple(1, 0, 0, ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFromActivateSimple_5_Servers() throws Exception {
changeActiveClusterStateSimple(5, 0, 0, ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFromActivateSimple_5_Servers2() throws Exception {
changeActiveClusterStateSimple(5, 0, 4, ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFromActivateSimple_5_Servers_5_Clients() throws Exception {
changeActiveClusterStateSimple(5, 4, 0, ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFromActivateSimple_5_Servers_5_Clients_FromClient() throws Exception {
changeActiveClusterStateSimple(5, 4, 6, ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateSimple_SingleNode() throws Exception {
activateSimple(1, 0, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateInReadOnlySimple_SingleNode() throws Exception {
activateSimple(1, 0, 0, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateSimple_5_Servers() throws Exception {
activateSimple(5, 0, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateInReadOnlySimple_5_Servers() throws Exception {
activateSimple(5, 0, 0, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateSimple_5_Servers2() throws Exception {
activateSimple(5, 0, 4, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateInReadOnlySimple_5_Servers2() throws Exception {
activateSimple(5, 0, 4, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateSimple_5_Servers_5_Clients() throws Exception {
activateSimple(5, 4, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateInReadOnlySimple_5_Servers_5_Clients() throws Exception {
activateSimple(5, 4, 0, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateSimple_5_Servers_5_Clients_FromClient() throws Exception {
activateSimple(5, 4, 6, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateInReadOnlySimple_5_Servers_5_Clients_FromClient() throws Exception {
activateSimple(5, 4, 6, ACTIVE_READ_ONLY);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param changeFrom Index of node stating activation.
* @param state Activation state.
* @throws Exception If failed.
*/
private void activateSimple(int srvs, int clients, int changeFrom, ClusterState state) throws Exception {
assertActive(state);
changeStateSimple(srvs, clients, changeFrom, INACTIVE, state);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param changeFrom Index of node stating deactivation.
* @param initialState Initial cluster state.
* @throws Exception If failed.
*/
private void deactivateSimple(int srvs, int clients, int changeFrom, ClusterState initialState) throws Exception {
assertActive(initialState);
changeStateSimple(srvs, clients, changeFrom, initialState, INACTIVE);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param deactivateFrom Index of node stating deactivation.
* @param initialState Initial cluster state.
* @param targetState Targer cluster state.
* @throws Exception If failed.
*/
private void changeActiveClusterStateSimple(
int srvs,
int clients,
int deactivateFrom,
ClusterState initialState,
ClusterState targetState
) throws Exception {
assertActive(initialState);
assertActive(targetState);
assertNotSame(initialState, targetState);
changeStateSimple(srvs, clients, deactivateFrom, initialState, targetState);
}
/**
* @throws Exception If failed.
*/
@Test
public void testReActivateSimple_5_Servers_4_Clients_FromClient() throws Exception {
reactivateSimple(5, 4, 6, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testReActivateInReadOnlySimple_5_Servers_4_Clients_FromClient() throws Exception {
reactivateSimple(5, 4, 6, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testReActivateSimple_5_Servers_4_Clients_FromServer() throws Exception {
reactivateSimple(5, 4, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testReActivateInReadOnlySimple_5_Servers_4_Clients_FromServer() throws Exception {
reactivateSimple(5, 4, 0, ACTIVE_READ_ONLY);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param activateFrom Index of node stating activation.
* @param state Activation state.
* @throws Exception If failed.
*/
private void reactivateSimple(int srvs, int clients, int activateFrom, ClusterState state) throws Exception {
activateSimple(srvs, clients, activateFrom, state);
if (state == ACTIVE)
rolloverSegmentAtLeastTwice(activateFrom);
for (int i = 0; i < srvs + clients; i++)
checkCachesOnNode(i, DEFAULT_CACHES_COUNT);
ignite(activateFrom).cluster().state(INACTIVE);
ignite(activateFrom).cluster().state(state);
if (state == ACTIVE)
rolloverSegmentAtLeastTwice(activateFrom);
for (int i = 0; i < srvs + clients; i++)
checkCachesOnNode(i, DEFAULT_CACHES_COUNT);
}
/**
* Work directory have 2 segments by default. This method do full circle.
*/
private void rolloverSegmentAtLeastTwice(int activateFrom) {
for (int c = 0; c < DEFAULT_CACHES_COUNT; c++) {
IgniteCache<Object, Object> cache = ignite(activateFrom).cache(CACHE_NAME_PREFIX + c);
//this should be enough including free-,meta- page and etc.
for (int i = 0; i < 1000; i++)
cache.put(i, i);
}
}
/**
* @param nodes Number of nodes.
* @param caches Number of caches.
*/
final void checkCaches(int nodes, int caches) throws InterruptedException {
checkCaches(nodes, caches, true);
}
/**
* @param nodes Number of nodes.
* @param caches Number of caches.
*/
final void checkCaches(int nodes, int caches, boolean awaitExchange) throws InterruptedException {
if (awaitExchange)
awaitPartitionMapExchange();
ClusterState state = ignite(0).cluster().state();
assertActive(state);
for (int i = 0; i < nodes; i++) {
for (int c = 0; c < caches; c++) {
IgniteCache<Integer, Integer> cache = ignite(i).cache(CACHE_NAME_PREFIX + c);
for (int j = 0; j < 10; j++) {
Integer key = ThreadLocalRandom.current().nextInt(1000);
Integer value = j;
if (state == ACTIVE)
cache.put(key, j);
else
assertThrowsWithCause(() -> cache.put(key, value), IgniteClusterReadOnlyException.class);
assertEquals(state == ACTIVE ? value : null, cache.get(key));
}
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileActivate1_Server() throws Exception {
joinWhileActivate1(false, false, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileActivateInReadOnly1_Server() throws Exception {
joinWhileActivate1(false, false, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileActivate1_WithCache_Server() throws Exception {
joinWhileActivate1(false, true, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileActivateInReadOnly1_WithCache_Server() throws Exception {
joinWhileActivate1(false, true, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileActivate1_Client() throws Exception {
joinWhileActivate1(true, false, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileActivateInReadOnly1_Client() throws Exception {
joinWhileActivate1(true, false, ACTIVE_READ_ONLY);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param stateChangeFrom Index of node initiating changes.
* @param initialState Cluster state on start nodes.
* @param targetState State of started cluster.
* @param blockMsgNodes Nodes whcis block exchange messages.
* @return State change future.
* @throws Exception If failed.
*/
private IgniteInternalFuture<?> startNodesAndBlockStatusChange(
int srvs,
int clients,
final int stateChangeFrom,
final ClusterState initialState,
final ClusterState targetState,
int... blockMsgNodes
) throws Exception {
assertNotSame(initialState, targetState);
if (!persistenceEnabled())
stateOnStart = initialState;
testSpi = true;
startWithCaches1(srvs, clients);
if (initialState.active()) {
// For in-memory grids cluster is already active.
ignite(0).cluster().state(initialState);
awaitPartitionMapExchange();
}
if (blockMsgNodes.length == 0)
blockMsgNodes = new int[] {1};
// Block next exchange.
List<TestRecordingCommunicationSpi> spis = new ArrayList<>();
for (int idx : blockMsgNodes) {
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ignite(idx));
spis.add(spi);
spi.blockMessages(TestRecordingCommunicationSpi.blockSingleExhangeMessage());
}
IgniteInternalFuture<?> stateChangeFut = runAsync(() -> ignite(stateChangeFrom).cluster().state(targetState));
for (TestRecordingCommunicationSpi spi : spis)
spi.waitForBlocked();
U.sleep(500);
assertFalse(stateChangeFut.isDone());
return stateChangeFut;
}
/**
* @param spi SPI.
* @param topVer Exchange topology version.
*/
private void blockExchangeSingleMessage(TestRecordingCommunicationSpi spi, final AffinityTopologyVersion topVer) {
spi.blockMessages((IgniteBiPredicate<ClusterNode, Message>)(clusterNode, msg) -> {
if (msg instanceof GridDhtPartitionsSingleMessage) {
GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg;
if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer))
return true;
}
return false;
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileDeactivate1_Server() throws Exception {
joinWhileDeactivate1(false, false, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileDeactivateFromReadOnly1_Server() throws Exception {
joinWhileDeactivate1(false, false, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileDeactivate1_WithCache_Server() throws Exception {
joinWhileDeactivate1(false, true, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileDeactivateFromReadOnly1_WithCache_Server() throws Exception {
joinWhileDeactivate1(false, true, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileDeactivate1_Client() throws Exception {
joinWhileDeactivate1(true, false, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWhileDeactivateFromReadOnly1_Client() throws Exception {
joinWhileDeactivate1(true, false, ACTIVE_READ_ONLY);
}
/**
* @param startClient If {@code true} joins client node, otherwise server.
* @param withNewCache If {@code true} joining node has new cache in configuration.
* @param state Target cluster state.
* @throws Exception If failed.
*/
private void joinWhileActivate1(boolean startClient, boolean withNewCache, ClusterState state) throws Exception {
joinWhileClusterStateChange(startClient, withNewCache, INACTIVE, state);
}
/**
* @param startClient If {@code true} joins client node, otherwise server.
* @param withNewCache If {@code true} joining node has new cache in configuration.
* @param state Initial cluster state.
* @throws Exception If failed.
*/
private void joinWhileDeactivate1(boolean startClient, boolean withNewCache, ClusterState state) throws Exception {
joinWhileClusterStateChange(startClient, withNewCache, state, INACTIVE);
}
/**
* @param startClient If {@code true} joins client node, otherwise server.
* @param withNewCache If {@code true} joining node has new cache in configuration.
* @param initialState Initial cluster state.
* @param targetState Target cluster state.
* @throws Exception If failed.
*/
private void joinWhileClusterStateChange(
boolean startClient,
boolean withNewCache,
ClusterState initialState,
ClusterState targetState
) throws Exception {
checkStatesAreDifferent(initialState, targetState);
int nodesCnt = 2;
IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(nodesCnt, 0, 0, initialState, targetState);
ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
final int numberOfCaches = ccfgs.length;
IgniteInternalFuture<?> startFut = startNodeAsync(nodesCnt++, startClient);
TestRecordingCommunicationSpi.spi(ignite(1)).stopBlock();
activeFut.get();
startFut.get();
if (targetState.active())
checkCachesOnNode(nodesCnt - 1, DEFAULT_CACHES_COUNT);
else {
checkNoCaches(nodesCnt);
ignite(nodesCnt - 1).cluster().state(initialState);
for (int c = 0; c < DEFAULT_CACHES_COUNT; c++)
checkCache(ignite(nodesCnt - 1), CACHE_NAME_PREFIX + c, true);
}
if (withNewCache) {
for (int i = 0; i < nodesCnt; i++)
checkCachesOnNode(i, numberOfCaches);
}
awaitPartitionMapExchange();
checkCaches(nodesCnt, numberOfCaches);
startGrid(nodesCnt++, false);
checkCaches(nodesCnt, numberOfCaches);
startGrid(nodesCnt++, true);
checkCaches(nodesCnt, numberOfCaches);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentJoinAndActivate() throws Exception {
testConcurrentJoinAndActivate(ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentJoinAndActivateInReadOnly() throws Exception {
testConcurrentJoinAndActivate(ACTIVE_READ_ONLY);
}
/** */
private void testConcurrentJoinAndActivate(ClusterState activateState) throws Exception {
assertActive(activateState);
for (int iter = 0; iter < 3; iter++) {
log.info("Iteration: " + iter);
stateOnStart = INACTIVE;
final int START_NODES = 3;
startWithCaches1(START_NODES, 0);
final int numberOfCaches = cacheConfigurations1().length;
final CyclicBarrier b = new CyclicBarrier(START_NODES + 1);
IgniteInternalFuture<Void> fut1 = runAsync(() -> {
b.await();
U.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
ignite(0).cluster().state(activateState);
return null;
});
final AtomicInteger nodeIdx = new AtomicInteger(START_NODES);
IgniteInternalFuture<Long> fut2 = GridTestUtils.runMultiThreadedAsync((Callable<Void>)() -> {
int idx = nodeIdx.getAndIncrement();
b.await();
startGrid(idx);
return null;
}, START_NODES, "start-node");
fut1.get();
fut2.get();
checkCaches(2 * START_NODES, numberOfCaches);
afterTest();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateSimple_SingleNode() throws Exception {
deactivateSimple(1, 0, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlySimple_SingleNode() throws Exception {
deactivateSimple(1, 0, 0, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateSimple_5_Servers() throws Exception {
deactivateSimple(5, 0, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlySimple_5_Servers() throws Exception {
deactivateSimple(5, 0, 0, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateSimple_5_Servers2() throws Exception {
deactivateSimple(5, 0, 4, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlySimple_5_Servers2() throws Exception {
deactivateSimple(5, 0, 4, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateSimple_5_Servers_5_Clients() throws Exception {
deactivateSimple(5, 4, 0, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlySimple_5_Servers_5_Clients() throws Exception {
deactivateSimple(5, 4, 0, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateSimple_5_Servers_5_Clients_FromClient() throws Exception {
deactivateSimple(5, 4, 6, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlySimple_5_Servers_5_Clients_FromClient() throws Exception {
deactivateSimple(5, 4, 6, ACTIVE_READ_ONLY);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @param changeFrom Index of node starting cluster state change from {@code initialState} to {@code targetState}.
* @param initialState Initial cluster state.
* @param targetState Target cluster state.
* @throws Exception If failed.
*/
private void changeStateSimple(
int srvs,
int clients,
int changeFrom,
ClusterState initialState,
ClusterState targetState
) throws Exception {
assertNotSame(initialState, targetState);
stateOnStart = initialState;
int nodesCnt = srvs + clients;
startWithCaches1(srvs, clients);
if (persistenceEnabled() && initialState.active())
grid(0).cluster().state(initialState);
checkClusterState(nodesCnt, initialState);
if (!initialState.active())
checkNoCaches(nodesCnt);
ignite(changeFrom).cluster().state(initialState); // Should be no-op.
checkClusterState(nodesCnt, initialState);
ignite(changeFrom).cluster().state(targetState);
checkClusterState(nodesCnt, targetState);
if (targetState.active()) {
for (int i = 0; i < nodesCnt; i++)
checkCachesOnNode(i, DEFAULT_CACHES_COUNT);
checkCaches(nodesCnt, DEFAULT_CACHES_COUNT);
}
else
checkNoCaches(nodesCnt);
startNodeAndCheckCaches(nodesCnt++, false, DEFAULT_CACHES_COUNT);
startNodeAndCheckCaches(nodesCnt++, true, DEFAULT_CACHES_COUNT);
if (!targetState.active()) {
checkNoCaches(nodesCnt);
checkClusterState(nodesCnt, targetState);
ignite(changeFrom).cluster().state(initialState);
checkClusterState(nodesCnt, initialState);
for (int i = 0; i < nodesCnt; i++) {
if (ignite(i).configuration().isClientMode())
checkCache(ignite(i), CU.UTILITY_CACHE_NAME, true);
else
checkCachesOnNode(i, DEFAULT_CACHES_COUNT);
}
}
}
/** */
private void startNodeAndCheckCaches(int nodeIdx, boolean client, int cachesCount) throws Exception {
startGrid(nodeIdx, client);
ClusterState state = grid(0).cluster().state();
if (state.active()) {
checkCachesOnNode(nodeIdx, cachesCount, !client);
checkCaches(nodeIdx + 1, cachesCount);
}
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @throws Exception If failed.
*/
private void startWithCaches1(int srvs, int clients) throws Exception {
for (int i = 0; i < srvs + clients; i++) {
ccfgs = cacheConfigurations1();
startGrid(i, i >= srvs);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterActive() throws Exception {
testClientReconnect(ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterActiveReadOnly() throws Exception {
testClientReconnect(ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterInactive() throws Exception {
testClientReconnect(INACTIVE);
}
/** */
private void testClientReconnect(ClusterState initialState) throws Exception {
testReconnectSpi = true;
stateOnStart = initialState;
ccfgs = cacheConfigurations1();
final int SRVS = 3;
final int CLIENTS = 3;
int nodesCnt = SRVS + CLIENTS;
startWithCaches1(SRVS, CLIENTS);
if (persistenceEnabled() && initialState.active())
ignite(0).cluster().state(initialState);
Ignite srv = ignite(0);
Ignite client = ignite(SRVS);
if (initialState.active()) {
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches(nodesCnt);
}
else
checkNoCaches(nodesCnt);
IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, null);
if (!initialState.active()) {
checkNoCaches(nodesCnt);
srv.cluster().state(ACTIVE);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
}
checkCaches(nodesCnt);
startGrid(nodesCnt++, false);
startGrid(nodesCnt++, true);
checkCaches(nodesCnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterDeactivated() throws Exception {
clientReconnectClusterState(ACTIVE, INACTIVE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterDeactivatedFromReadOnly() throws Exception {
clientReconnectClusterState(ACTIVE_READ_ONLY, INACTIVE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterDeactivateInProgress() throws Exception {
clientReconnectClusterState(ACTIVE, INACTIVE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterDeactivateFromReadOnlyInProgress() throws Exception {
clientReconnectClusterState(ACTIVE_READ_ONLY, INACTIVE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterActivated() throws Exception {
clientReconnectClusterState(INACTIVE, ACTIVE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterActivatedReadOnly() throws Exception {
clientReconnectClusterState(INACTIVE, ACTIVE_READ_ONLY, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterActivateInProgress() throws Exception {
clientReconnectClusterState(INACTIVE, ACTIVE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectClusterActivateReadOnlyInProgress() throws Exception {
clientReconnectClusterState(INACTIVE, ACTIVE_READ_ONLY, true);
}
/**
* @param initialState Initial cluster state.
* @param targetState Cluster state after transition.
* @param transition If {@code true} client reconnects while cluster state transition is in progress.
* @throws Exception If failed.
*/
private void clientReconnectClusterState(
ClusterState initialState,
ClusterState targetState,
final boolean transition
) throws Exception {
assertNotSame(initialState, targetState);
testReconnectSpi = true;
testSpi = transition;
stateOnStart = initialState;
final int SRVS = 3;
final int CLIENTS = 3;
int nodesCnt = SRVS + CLIENTS;
startWithCaches1(SRVS, CLIENTS);
final Ignite srv = ignite(0);
IgniteEx client = grid(SRVS);
if (persistenceEnabled() && initialState.active())
ignite(0).cluster().state(initialState);
if (initialState.active()) {
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches(nodesCnt);
// Wait for late affinity assignment to finish.
awaitPartitionMapExchange();
}
else
checkNoCaches(nodesCnt);
final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(nodesCnt + 1, 1);
final TestRecordingCommunicationSpi spi1 = transition ? TestRecordingCommunicationSpi.spi(ignite(1)) : null;
final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> {
if (transition) {
blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
stateFut.set(runAsync(() -> srv.cluster().state(targetState), initialState + "->" + targetState));
try {
U.sleep(500);
}
catch (IgniteInterruptedCheckedException e) {
U.error(log, e);
}
}
else
srv.cluster().state(targetState);
});
if (transition) {
assertFalse(stateFut.get().isDone());
assertTrue(client.context().state().clusterState().transition());
// Public API method would block forever because we blocked the exchange message.
assertEquals(
GridClusterStateProcessor.stateWithMinimalFeatures(initialState, targetState),
client.context().state().publicApiState(false)
);
spi1.waitForBlocked();
spi1.stopBlock();
stateFut.get().get();
}
if (!targetState.active()) {
checkNoCaches(nodesCnt);
ignite(0).cluster().state(initialState);
checkClusterState(nodesCnt, initialState);
}
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches(nodesCnt);
checkCache(client, CACHE_NAME_PREFIX + 0, true);
startGrid(nodesCnt++, false);
startGrid(nodesCnt++, true);
checkCaches(nodesCnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInactiveTopologyChanges() throws Exception {
checkInactiveTopologyChanges(ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInactiveTopologyChangesReadOnly() throws Exception {
checkInactiveTopologyChanges(ACTIVE_READ_ONLY);
}
/** */
private void checkInactiveTopologyChanges(ClusterState state) throws Exception {
assertActive(state);
testSpi = true;
testSpiRecord = new Class[] {GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class};
stateOnStart = INACTIVE;
final int SRVS = 4;
final int CLIENTS = 4;
int nodesCnt = SRVS + CLIENTS;
startWithCaches1(SRVS, CLIENTS);
checkRecordedMessages(false);
for (int i = 0; i < 2; i++) {
stopGrid(i);
startGrid(i, false);
}
checkRecordedMessages(false);
for (int i = 0; i < 2; i++) {
stopGrid(SRVS + i);
startGrid(SRVS + i, true);
}
checkRecordedMessages(false);
ignite(0).cluster().state(state);
checkCaches(nodesCnt);
checkRecordedMessages(true);
startGrid(nodesCnt++, false);
startGrid(nodesCnt++, true);
checkRecordedMessages(true);
checkCaches(nodesCnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateFailover1() throws Exception {
stateChangeFailover1(INACTIVE, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateWithReadOnlyFailover1() throws Exception {
stateChangeFailover1(INACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFailover1() throws Exception {
stateChangeFailover1(ACTIVE, INACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlyFailover1() throws Exception {
stateChangeFailover1(ACTIVE_READ_ONLY, INACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFailover1() throws Exception {
stateChangeFailover1(ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFailover1() throws Exception {
stateChangeFailover1(ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateFailover2() throws Exception {
stateChangeFailover2(INACTIVE, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateWithReadOnlyFailover2() throws Exception {
stateChangeFailover2(INACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFailover2() throws Exception {
stateChangeFailover2(ACTIVE, INACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlyFailover2() throws Exception {
stateChangeFailover2(ACTIVE_READ_ONLY, INACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFailover2() throws Exception {
stateChangeFailover2(ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFailover2() throws Exception {
stateChangeFailover2(ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateFailover3() throws Exception {
stateChangeFailover3(INACTIVE, ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testActivateWithReadOnlyFailover3() throws Exception {
stateChangeFailover3(INACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFailover3() throws Exception {
stateChangeFailover3(ACTIVE, INACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeactivateFromReadOnlyFailover3() throws Exception {
stateChangeFailover3(ACTIVE_READ_ONLY, INACTIVE);
}
/** @throws Exception If failed. */
@Test
public void testDeactivateMXBean() throws Exception {
Ignite ignite = startGrid();
IgniteMXBean mxBean = getMxBean(ignite.name(), "Kernal", "IgniteKernal", IgniteMXBean.class);
if (persistenceEnabled())
ignite.cluster().state(ACTIVE);
// Create a new cache in order to trigger all needed checks on deactivation.
ignite.getOrCreateCache("test-partitioned-cache");
checkMXBeanDeactivation(ignite, () -> mxBean.active(false), false);
checkMXBeanDeactivation(ignite, () -> mxBean.clusterState(INACTIVE.name()), false);
checkMXBeanDeactivation(ignite, () -> mxBean.clusterState(INACTIVE.name(), false), false);
checkMXBeanDeactivation(ignite, () -> mxBean.clusterState(INACTIVE.name(), true), true);
}
/**
* Checks that not forced deactivation fails only if cluster has in-memory caches.
*
* @param ignite Ignite instance.
* @param deactivator Deactivation call to check. Assuming from the mx bean.
* @param forceDeactivation {@code True} if {@code deactivator} is forced.
*/
private void checkMXBeanDeactivation(Ignite ignite, MXAction deactivator, boolean forceDeactivation) {
assertEquals(ACTIVE, ignite.cluster().state());
if (persistenceEnabled() || forceDeactivation) {
try {
deactivator.action();
}
catch (JMException e) {
throw new IllegalStateException("Unexpected exception on cluster deactivation.", e);
}
assertEquals(INACTIVE, ignite.cluster().state());
ignite.cluster().state(ACTIVE);
}
else {
assertThrows(log, () -> {
deactivator.action();
return null;
}, JMException.class, DATA_LOST_ON_DEACTIVATION_WARNING);
try {
deactivator.action();
}
catch (JMException e) {
assertNull("A JMX exception should not contain any cause. JMX client might be launched with " +
"other class path witout remote class loading and won't be able to deserialize teh stacktrace.",
e.getCause());
}
}
assertEquals(ACTIVE, ignite.cluster().state());
}
/**
* @throws Exception If failed.
*/
@Test
public void testEnableReadOnlyFailover3() throws Exception {
stateChangeFailover3(ACTIVE, ACTIVE_READ_ONLY);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisableReadOnlyFailover3() throws Exception {
stateChangeFailover3(ACTIVE_READ_ONLY, ACTIVE);
}
/**
* @param initialState Initial cluster state.
* @param targetState Target cluster state.
* @throws Exception If failed.
*/
private void stateChangeFailover1(ClusterState initialState, ClusterState targetState) throws Exception {
stateChangeFailover(initialState, targetState, 1, 1, 4);
}
/**
* @param initialState Initial cluster state.
* @param targetState Target cluster state.
* @throws Exception If failed.
*/
private void stateChangeFailover2(ClusterState initialState, ClusterState targetState) throws Exception {
stateChangeFailover(initialState, targetState, 2, 0, 1, 4);
}
/**
* @param initialState Initial cluster state.
* @param targetState Target cluster state.
* @throws Exception If failed.
*/
private void stateChangeFailover3(ClusterState initialState, ClusterState targetState) throws Exception {
assertNotSame(initialState, targetState);
testReconnectSpi = true;
final int servers = 4;
final int clients = 0;
int nodesCnt = servers + clients;
startNodesAndBlockStatusChange(servers, clients, 0, initialState, targetState);
IgniteInternalFuture<?> startFut1 = startNodeAsync(nodesCnt++, false);
IgniteInternalFuture<?> startFut2 = startNodeAsync(nodesCnt++, false);
final int expNodesCnt = nodesCnt;
assertTrue(waitForCondition(() -> grid(0).cluster().nodes().size() == expNodesCnt, 30000L));
// Stop all nodes participating in state change and not allow last node to finish exchange.
for (int i = 0; i < servers; i++)
((IgniteDiscoverySpi)ignite(i).configuration().getDiscoverySpi()).simulateNodeFailure();
for (int i = 0; i < servers; i++)
stopGrid(getTestIgniteInstanceName(i), true, false);
startFut1.get();
startFut2.get();
for (int i = servers; i < nodesCnt; i++)
assertEquals(ignite(i).name(), INACTIVE, ignite(i).cluster().state());
ignite(servers).cluster().state(initialState.active() ? initialState : targetState);
doFinalChecks(servers, nodesCnt);
}
/**
* @param initialState Initial cluster state.
* @param targetState Target cluster state.
* @param startExtraNodes Number of started server nodes during blocked status change.
* @param restartNodes Indexes of ignite instances for restart.
* @throws Exception If failed.
*/
private void stateChangeFailover(
ClusterState initialState,
ClusterState targetState,
int startExtraNodes,
int... restartNodes
) throws Exception {
assertNotSame(initialState, targetState);
assertTrue(Arrays.toString(restartNodes) + " doesn't contain element 1", U.containsIntArray(restartNodes, 1));
assertTrue(Arrays.toString(restartNodes) + " doesn't contain element 4", U.containsIntArray(restartNodes, 4));
final int servers = 4;
final int clients = 4;
int nodesCnt = servers + clients;
// Nodes 1 and 4 do not reply to coordinator.
IgniteInternalFuture<?> fut = startNodesAndBlockStatusChange(servers, clients, 3, initialState, targetState, 1, 4);
List<IgniteInternalFuture<?>> startFuts = new ArrayList<>();
// Start more nodes while transition is in progress.
for (int i = 0; i < startExtraNodes; i++)
startFuts.add(startNodeAsync(nodesCnt++, false));
final int exceptedNodesCnt = nodesCnt;
assertTrue(waitForCondition(() -> grid(0).cluster().nodes().size() == exceptedNodesCnt, 30000L));
for (int idx : restartNodes)
stopGrid(getTestIgniteInstanceName(idx), true, false);
fut.get();
for (IgniteInternalFuture<?> startFut : startFuts)
startFut.get();
for (int idx : restartNodes)
startGrid(idx, idx >= servers & idx < (servers + clients));
if (!targetState.active()) {
checkNoCaches(nodesCnt);
ignite(0).cluster().state(initialState);
}
ignite(0).resetLostPartitions(Arrays.asList(CACHE_NAME_PREFIX + "0", CACHE_NAME_PREFIX + "1"));
checkCaches(nodesCnt);
}
/** */
protected void doFinalChecks(int startNodes, int nodesCnt) throws Exception {
for (int i = 0; i < startNodes; i++)
startGrid(i);
checkCaches(nodesCnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClusterStateNotWaitForDeactivation() throws Exception {
checkClusterStateNotWaitForDeactivation(ACTIVE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testReadOnlyClusterStateNotWaitForDeactivation() throws Exception {
checkClusterStateNotWaitForDeactivation(ACTIVE_READ_ONLY);
}
/** */
private void checkClusterStateNotWaitForDeactivation(ClusterState initialState) throws Exception {
assertActive(initialState);
testSpi = true;
final int nodes = 2;
IgniteEx crd = startGrids(nodes);
crd.cluster().state(initialState);
AffinityTopologyVersion curTopVer = crd.context().discovery().topologyVersionEx();
AffinityTopologyVersion deactivationTopVer = new AffinityTopologyVersion(
curTopVer.topologyVersion(),
curTopVer.minorTopologyVersion() + 1
);
for (int gridIdx = 0; gridIdx < nodes; gridIdx++)
blockExchangeSingleMessage(TestRecordingCommunicationSpi.spi(grid(gridIdx)), deactivationTopVer);
IgniteInternalFuture deactivationFut = runAsync(() -> crd.cluster().state(INACTIVE));
// Wait for deactivation start.
assertTrue(GridTestUtils.waitForCondition(
() -> {
DiscoveryDataClusterState clusterState = crd.context().state().clusterState();
return clusterState.transition() && !clusterState.state().active();
},
getTestTimeout()
));
// Check that deactivation transition wait is not happened.
ClusterState state = crd.context().state().publicApiState(true);
assertInactive(state);
for (int gridIdx = 0; gridIdx < nodes; gridIdx++)
TestRecordingCommunicationSpi.spi(grid(gridIdx)).stopBlock();
deactivationFut.get();
}
/**
* @param exp If {@code true} there should be recorded messages.
*/
private void checkRecordedMessages(boolean exp) {
for (Ignite node : G.allGrids()) {
List<Object> recorded = TestRecordingCommunicationSpi.spi(node).recordedMessages(false);
if (exp)
assertFalse(F.isEmpty(recorded));
else
assertTrue(F.isEmpty(recorded));
}
}
/**
* @return Cache configurations.
*/
final CacheConfiguration[] cacheConfigurations1() {
CacheConfiguration[] ccfgs = new CacheConfiguration[2];
ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC);
ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
return ccfgs;
}
/**
* @return Cache configurations.
*/
final CacheConfiguration[] cacheConfigurations2() {
CacheConfiguration[] ccfgs = new CacheConfiguration[5];
ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC);
ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
ccfgs[2] = cacheConfiguration(CACHE_NAME_PREFIX + 2, ATOMIC);
ccfgs[3] = cacheConfiguration(CACHE_NAME_PREFIX + 3, TRANSACTIONAL);
ccfgs[4] = cacheConfiguration(CACHE_NAME_PREFIX + 4, TRANSACTIONAL);
ccfgs[4].setDataRegionName(NO_PERSISTENCE_REGION);
ccfgs[4].setDiskPageCompression(null);
return ccfgs;
}
/**
* @param name Cache name.
* @param atomicityMode Atomicity mode.
* @return Cache configuration.
*/
protected final CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration(name);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setBackups(1);
return ccfg;
}
/**
* @param cacheName Cache name.
* @param node Node.
* @param exp {@code True} if expect that cache is started on node.
*/
void checkCache(Ignite node, String cacheName, boolean exp) throws IgniteCheckedException {
GridTestUtils.waitForCondition(
() -> ((IgniteEx)node).context().cache().context().exchange().lastTopologyFuture() != null,
1000
);
((IgniteEx)node).context().cache().context().exchange().lastTopologyFuture().get();
((IgniteEx)node).context().state().publicApiState(true);
GridCacheAdapter cache = ((IgniteEx)node).context().cache().internalCache(cacheName);
if (exp)
assertNotNull("Cache not found [cache=" + cacheName + ", node=" + node.name() + ']', cache);
else
assertNull("Unexpected cache found [cache=" + cacheName + ", node=" + node.name() + ']', cache);
}
/**
* @param nodes Number of nodes.
*/
final void checkNoCaches(int nodes) {
for (int i = 0; i < nodes; i++) {
assertEquals(INACTIVE, grid(i).context().state().publicApiState(true));
GridCacheProcessor cache = ignite(i).context().cache();
assertTrue(cache.caches().isEmpty());
assertTrue(cache.internalCaches().stream().allMatch(c -> c.context().isRecoveryMode()));
}
}
/** */
private void checkClusterState(int nodesCnt, ClusterState state) {
for (int i = 0; i < nodesCnt; i++)
assertEquals(ignite(i).name(), state, ignite(i).cluster().state());
}
/** */
protected void checkCachesOnNode(int nodeNumber, int cachesCnt) throws IgniteCheckedException {
checkCachesOnNode(nodeNumber, cachesCnt, true);
}
/** */
protected void checkCachesOnNode(int nodeNumber, int cachesCnt, boolean expUserCaches) throws IgniteCheckedException {
for (int c = 0; c < cachesCnt; c++)
checkCache(ignite(nodeNumber), CACHE_NAME_PREFIX + c, expUserCaches);
checkCache(ignite(nodeNumber), CU.UTILITY_CACHE_NAME, true);
}
/**
* @param nodes Expected nodes number.
*/
private void checkCaches(int nodes) throws InterruptedException {
checkCaches(nodes, 2, false);
}
/** */
private static void checkStatesAreDifferent(ClusterState state1, ClusterState state2) {
assertTrue(state1 + " " + state2, state1.active() != state2.active());
}
/** */
protected void startGrid(int nodeNumber, boolean client) throws Exception {
startGrid(nodeNumber, client, null);
}
/** */
protected void startGrid(int nodeNumber, boolean client, CacheConfiguration[] cacheConfigs) throws Exception {
if (cacheConfigs != null)
this.ccfgs = cacheConfigs;
if (client)
startClientGrid(nodeNumber);
else
startGrid(nodeNumber);
}
/** */
private IgniteInternalFuture<?> startNodeAsync(int nodeNumber, boolean client) {
return runAsync(() -> {
try {
if (client)
startClientGrid(nodeNumber);
else
startGrid(nodeNumber);
}
catch (Exception e) {
throw new IgniteException(e);
}
}, "start" + "-" + (client ? "client" : "server") + "-node" + nodeNumber);
}
/** An JMX bean call that can throw JMException. */
private interface MXAction {
/** */
void action() throws JMException;
}
}