blob: 0faf350ce2afc1235ece12f93bda14606252d39e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
/**
* Tests complex scenario with topology validator. Grid is split between to data centers, defined by attribute {@link
* #DC_NODE_ATTR}. If only nodes from single DC are left in topology, grid is moved into inoperative state until special
* activator node'll enter a topology, enabling grid operations.
*/
public class IgniteTopologyValidatorGridSplitCacheTest extends IgniteCacheTopologySplitAbstractTest {
/** */
private static final String DC_NODE_ATTR = "dc";
/** */
private static final String ACTIVATOR_NODE_ATTR = "split.resolved";
/** */
private static final int GRID_CNT = 32;
/** */
private static final int CACHES_CNT = 50;
/** */
private static final int RESOLVER_GRID_IDX = GRID_CNT;
/** */
private static final int CONFIGLESS_GRID_IDX = GRID_CNT + 1;
/** */
private static final String STATIC_IP = "127.0.0.1";
/** */
private static final Collection<String> SEG_FINDER_0;
/** */
private static final Collection<String> SEG_FINDER_1;
/** */
private static final Collection<String> SEG_FINDER_ALL;
static {
Collection<String> seg0 = new ArrayList<>();
Collection<String> seg1 = new ArrayList<>();
for (int i = 0; i < GRID_CNT; i += 2) {
seg0.add(STATIC_IP + ':' + (DFLT_PORT + i));
seg1.add(STATIC_IP + ':' + (DFLT_PORT + i + 1));
}
SEG_FINDER_0 = Collections.unmodifiableCollection(seg0);
SEG_FINDER_1 = Collections.unmodifiableCollection(seg1);
SEG_FINDER_ALL = F.concat(false, SEG_FINDER_0, SEG_FINDER_1);
}
/** */
private boolean useCacheGrp;
/** */
private int getDiscoPort(int gridIdx) {
return DFLT_PORT + gridIdx;
}
/** */
private boolean isDiscoPort(int port) {
return port >= DFLT_PORT &&
port <= (DFLT_PORT + TcpDiscoverySpi.DFLT_PORT_RANGE);
}
/** {@inheritDoc} */
@Override protected boolean isBlocked(int locPort, int rmtPort) {
return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort);
}
/** */
private int segment(int discoPort) {
return (discoPort - DFLT_PORT) % 2;
}
/** {@inheritDoc} */
@Override protected int segment(ClusterNode node) {
return node.attribute(DC_NODE_ATTR);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
int idx = getTestIgniteInstanceIndex(gridName);
Map<String, Object> userAttrs = new HashMap<>(4);
int segment = idx % 2;
userAttrs.put(DC_NODE_ATTR, segment);
TcpDiscoverySpi disco = (TcpDiscoverySpi)cfg.getDiscoverySpi();
disco.setLocalPort(getDiscoPort(idx));
disco.setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(segmented() ?
(segment == 0 ? SEG_FINDER_0 : SEG_FINDER_1) : SEG_FINDER_ALL));
if (idx != CONFIGLESS_GRID_IDX) {
if (idx == RESOLVER_GRID_IDX) {
userAttrs.put(ACTIVATOR_NODE_ATTR, "true");
}
else
cfg.setActiveOnStart(false);
}
cfg.setUserAttributes(userAttrs);
cfg.setMemoryConfiguration(new MemoryConfiguration().
setDefaultMemoryPolicySize((50L << 20) + (100L << 20) * CACHES_CNT / GRID_CNT));
return cfg;
}
/** */
protected Collection<CacheConfiguration> getCacheConfigurations() {
CacheConfiguration[] ccfgs = new CacheConfiguration[CACHES_CNT];
for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(testCacheName(cnt));
ccfg.setCacheMode(PARTITIONED);
ccfg.setWriteSynchronizationMode(PRIMARY_SYNC);
ccfg.setBackups(0);
ccfg.setTopologyValidator(new SplitAwareTopologyValidator());
if (useCacheGrp)
ccfg.setGroupName("testGroup");
ccfgs[cnt] = ccfg;
}
return Arrays.asList(ccfgs);
}
/**
* @param idx Index.
* @return Cache name.
*/
private String testCacheName(int idx) {
return "test" + idx;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-7952", MvccFeatureChecker.forcedMvcc());
super.beforeTest();
startGridsMultiThreaded(GRID_CNT);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/** */
protected void stopGrids(int... grids) {
for (int idx : grids)
stopGrid(idx);
}
/**
* Tests topology split scenario.
*
* @throws Exception If failed.
*/
@Test
public void testTopologyValidator() throws Exception {
testTopologyValidator0(false);
}
/**
* Tests topology split scenario.
*
* @throws Exception If failed.
*/
@Test
public void testTopologyValidatorWithCacheGroup() throws Exception {
testTopologyValidator0(true);
}
/**
* Tests topology split scenario.
*
* @param useCacheGrp Use cache group.
* @throws Exception If failed.
*/
private void testTopologyValidator0(boolean useCacheGrp) throws Exception {
this.useCacheGrp = useCacheGrp;
IgniteEx grid = grid(0);
grid.getOrCreateCaches(getCacheConfigurations());
// Init grid index arrays
int[] seg1 = new int[GRID_CNT / 2];
for (int i = 0; i < seg1.length; ++i)
seg1[i] = i * 2 + 1;
int[] seg0 = new int[GRID_CNT - seg1.length];
for (int i = 0; i < seg0.length; ++i)
seg0[i] = i * 2;
// Tests what each node is able to do puts.
tryPut(seg0, seg1);
clearAll();
// Force segmentation.
splitAndWait();
try {
tryPut(seg0, seg1);
fail();
}
catch (Exception e) {
// No-op.
}
// Repair split by adding activator node in topology.
resolveSplit();
tryPut(seg0);
clearAll();
try {
tryPut(seg1);
fail();
}
catch (Exception e) {
// No-op.
}
stopGrids(seg1);
// Fix split by adding node from second DC.
unsplit();
startGrid(CONFIGLESS_GRID_IDX);
awaitPartitionMapExchange();
tryPut(seg0);
tryPut(CONFIGLESS_GRID_IDX);
clearAll();
// Force split by removing last node from second DC.
stopGrid(CONFIGLESS_GRID_IDX);
awaitPartitionMapExchange();
try {
tryPut(seg0);
fail();
}
catch (Exception e) {
// No-op.
}
// Repair split with concurrent server node join race.
resolveSplitWithRace(CONFIGLESS_GRID_IDX);
// Repair split by adding activator node in topology.
resolveSplit();
tryPut(seg0);
clearAll();
// Removing one node from segmented DC, shouldn't reset repair state.
stopGrid(0);
awaitPartitionMapExchange();
for (int idx : seg0) {
if (idx == 0)
continue;
assertEquals("Expecting put count", CACHES_CNT, tryPut(idx));
}
clearAll(2);
// Add node to segmented DC, shouldn't reset repair state.
startGrid(0);
awaitPartitionMapExchange();
assertEquals("Expecting put count", CACHES_CNT * seg0.length, tryPut(seg0));
}
/**
* @param g Node index.
*/
private void clearAll(int g) {
for (int i = 0; i < CACHES_CNT; i++)
grid(g).cache(testCacheName(i)).clear();
}
/** */
private void clearAll() {
clearAll(0);
}
/**
* Resolves split by client node join.
*
* @throws Exception If failed.
*/
private void resolveSplit() throws Exception {
startClientGrid(RESOLVER_GRID_IDX);
stopGrid(RESOLVER_GRID_IDX);
}
/**
* Resolves split by client node join with server node join race simulation.
*
* @param srvNode server node index to simulate join race
* @throws Exception If failed.
*/
private void resolveSplitWithRace(int srvNode) throws Exception {
startClientGrid(RESOLVER_GRID_IDX);
startGrid(srvNode);
awaitPartitionMapExchange();
tryPut(srvNode);
clearAll();
stopGrid(srvNode);
awaitPartitionMapExchange();
try {
tryPut(0);
fail();
}
catch (Exception e) {
// No-op.
}
stopGrid(RESOLVER_GRID_IDX);
}
/**
* @param idx Grid to test.
* @return number of successful puts to caches
* @throws IgniteException If all tries to put was failed.
* @throws AssertionError If some of tries to put was failed.
*/
private int tryPut(int idx) {
IgniteEx g = grid(idx);
int putCnt = 0;
IgniteException ex = null;
for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
String cacheName = testCacheName(cnt);
int key = -1;
Affinity<Object> aff = g.affinity(cacheName);
for (int k = 0; k < aff.partitions(); k++) {
if (aff.isPrimary(g.cluster().localNode(), k)) {
key = k;
break;
}
}
assertTrue("Failed to find affinity key [gridIdx=" + idx + ", cache=" + cacheName + ']',
key != -1);
IgniteCache<Object, Object> cache = g.cache(cacheName);
try {
cache.put(key, key);
assertEquals(1, cache.localSize());
if (ex != null)
throw new AssertionError("Successful tryPut after failure [gridIdx=" + idx +
", cacheName=" + cacheName + ']', ex);
putCnt++;
}
catch (Throwable t) {
IgniteException e = new IgniteException("Failed to put entry [gridIdx=" + idx + ", cache=" +
cacheName + ", key=" + key + ']', t);
log.error(e.getMessage(), e.getCause());
if (ex == null)
ex = new IgniteException("Failed to put entry [node=" + g.name() + ']');
ex.addSuppressed(t);
}
}
if (ex != null)
throw ex;
return putCnt;
}
/**
* @param grids Grids to test.
* @return number of successful puts to caches
* @throws IgniteException If all tries to put was failed.
* @throws AssertionError If some of tries to put was failed.
*/
private int tryPut(int[]... grids) {
int putCnt = 0;
IgniteException ex = null;
for (int[] idxs : grids) {
for (int idx : idxs) {
try {
int cnt = tryPut(idx);
if (ex != null)
throw new AssertionError("Successful tryPut after failure [gridIdx=" + idx +
", sucessful puts = " + cnt + ']', ex);
putCnt += cnt;
}
catch (Exception e) {
if (ex == null)
ex = new IgniteException("Failed to put entry");
ex.addSuppressed(e);
if (putCnt != 0) {
throw new AssertionError("Failure after successful tryPut [gridIdx=" + idx +
", successful puts = " + putCnt + ']', ex);
}
}
}
}
if (ex != null)
throw ex;
return putCnt;
}
/**
* Prevents cache from performing any operation if only nodes from single data center are left in topology.
*/
private static class SplitAwareTopologyValidator implements TopologyValidator {
/** */
private static final long serialVersionUID = 0L;
/** */
@CacheNameResource
private transient String cacheName;
/** */
@IgniteInstanceResource
private transient Ignite ignite;
/** */
@LoggerResource
private transient IgniteLogger log;
/** State. */
private transient State state;
/** {@inheritDoc} */
@Override public boolean validate(Collection<ClusterNode> nodes) {
initIfNeeded(nodes);
for (ClusterNode node : F.view(nodes, new IgnitePredicate<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return !node.isClient() && node.attribute(DC_NODE_ATTR) == null;
}
})) {
log.error("Not valid server nodes are detected in topology: [cacheName=" + cacheName + ", node=" +
node + ']');
return false;
}
boolean segmented = segmented(nodes);
if (!segmented)
state = State.VALID; // Also clears possible BEFORE_REPAIRED and REPAIRED states.
else {
if (state == State.REPAIRED) // Any topology change in segmented grid in repaired mode is valid.
return true;
// Find discovery event node.
ClusterNode evtNode = evtNode(nodes);
if (activator(evtNode))
state = State.BEFORE_REPARED;
else {
if (state == State.BEFORE_REPARED) {
boolean activatorLeft = true;
// Check if activator is no longer in topology.
for (ClusterNode node : nodes) {
if (node.isClient() && activator(node)) {
activatorLeft = false;
break;
}
}
if (activatorLeft) {
if (log.isInfoEnabled())
log.info("Grid segmentation is repaired: [cacheName=" + cacheName + ']');
state = State.REPAIRED; // Switch to REPAIRED state only when activator leaves.
} // Else stay in BEFORE_REPARED state.
}
else {
if (state == State.VALID) {
if (log.isInfoEnabled())
log.info("Grid segmentation is detected: [cacheName=" + cacheName + ']');
}
state = State.NOTVALID;
}
}
}
return state == State.VALID || state == State.REPAIRED;
}
/** */
private boolean segmented(Collection<ClusterNode> nodes) {
ClusterNode prev = null;
for (ClusterNode node : nodes) {
if (node.isClient())
continue;
if (prev != null &&
!prev.attribute(DC_NODE_ATTR).equals(node.attribute(DC_NODE_ATTR)))
return false;
prev = node;
}
return true;
}
/**
* @param node Node.
* @return {@code True} if this is marker node.
*/
private boolean activator(ClusterNode node) {
return node.isClient() && node.attribute(ACTIVATOR_NODE_ATTR) != null;
}
/**
* Sets initial validator state.
*
* @param nodes Topology nodes.
*/
private void initIfNeeded(Collection<ClusterNode> nodes) {
if (state != null)
return;
// Search for activator node in history on start.
long topVer = evtNode(nodes).order();
while (topVer > 0) {
Collection<ClusterNode> top = ignite.cluster().topology(topVer--);
// Stop on reaching history limit.
if (top == null)
return;
boolean segmented = segmented(top);
// Stop on reaching valid topology.
if (!segmented)
return;
for (ClusterNode node : top) {
if (activator(node)) {
state = State.REPAIRED;
return;
}
}
}
}
/**
* Returns node with biggest order (event topology version).
*
* @param nodes Topology nodes.
* @return ClusterNode Node.
*/
private ClusterNode evtNode(Collection<ClusterNode> nodes) {
ClusterNode evtNode = null;
for (ClusterNode node : nodes) {
if (evtNode == null || node.order() > evtNode.order())
evtNode = node;
}
return evtNode;
}
/** States. */
private enum State {
/** Topology is valid. */
VALID,
/** Topology is not valid */
NOTVALID,
/** Before topology will be repaired (valid) */
BEFORE_REPARED,
/** Topology is repaired (valid) */
REPAIRED;
}
}
}