blob: ad253443d559bd78acddba977244cdf96c52c284 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
/**
*
*/
public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/** */
private static final int TEST_SIZE = SF.applyLB(100_000, 10_000);
/** */
private static final long TOPOLOGY_STILLNESS_TIME = SF.applyLB(30_000, 5_000);
/** partitioned cache name. */
protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP";
/** partitioned cache 2 name. */
protected static final String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
/** replicated cache name. */
protected static final String CACHE_NAME_DHT_REPLICATED = "cacheR";
/** replicated cache 2 name. */
protected static final String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
/** */
private volatile boolean concurrentStartFinished;
/** */
private volatile boolean concurrentStartFinished2;
/** */
private volatile boolean concurrentStartFinished3;
/**
* Time in milliseconds of last received {@link GridDhtPartitionsSingleMessage}
* or {@link GridDhtPartitionsFullMessage} using {@link CollectingCommunicationSpi}.
*/
private static volatile long lastPartMsgTime;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName);
if (MvccFeatureChecker.forcedMvcc()) {
iCfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(400L * 1024 * 1024)
));
}
TcpCommunicationSpi commSpi = new CollectingCommunicationSpi();
commSpi.setTcpNoDelay(true);
iCfg.setCommunicationSpi(commSpi);
CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED);
cachePCfg.setCacheMode(CacheMode.PARTITIONED);
cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cachePCfg.setBackups(1);
cachePCfg.setRebalanceBatchSize(1);
cachePCfg.setRebalanceBatchesPrefetchCount(1);
cachePCfg.setRebalanceOrder(2);
cachePCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cachePCfg.setAffinity(new RendezvousAffinityFunction().setPartitions(32));
CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2);
cachePCfg2.setCacheMode(CacheMode.PARTITIONED);
cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
cachePCfg2.setBackups(1);
cachePCfg2.setRebalanceOrder(2);
cachePCfg2.setRebalanceDelay(SF.applyLB(5000, 500));
cachePCfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cachePCfg2.setAffinity(new RendezvousAffinityFunction().setPartitions(32));
CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
cacheRCfg.setCacheMode(CacheMode.REPLICATED);
cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cacheRCfg.setRebalanceBatchSize(1);
cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE);
cacheRCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheRCfg.setAffinity(new RendezvousAffinityFunction().setPartitions(32));
CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2);
cacheRCfg2.setCacheMode(CacheMode.REPLICATED);
cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
cacheRCfg2.setRebalanceOrder(4);
cacheRCfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheRCfg2.setAffinity(new RendezvousAffinityFunction().setPartitions(32));
iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2);
iCfg.setRebalanceThreadPoolSize(3);
return iCfg;
}
/**
* @param ignite Ignite.
* @param from Start from key.
* @param iter Iteration.
*/
protected void generateData(Ignite ignite, int from, int iter) {
generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
}
/**
* @param ignite Ignite.
* @param name Cache name.
* @param from Start from key.
* @param iter Iteration.
*/
protected void generateData(Ignite ignite, String name, int from, int iter) {
try (IgniteDataStreamer<Integer, Integer> dataStreamer = ignite.dataStreamer(name)) {
dataStreamer.allowOverwrite(true);
for (int i = from; i < from + TEST_SIZE; i++) {
if ((i + 1) % (TEST_SIZE / 10) == 0)
log.info("Prepared " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE +
", iteration=" + iter + ", cache=" + name + "]");
dataStreamer.addData(i, i + name.hashCode() + iter);
}
}
}
/**
* @param ignite Ignite.
* @param from Start from key.
* @param iter Iteration.
*/
protected void checkData(Ignite ignite, int from, int iter) {
checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter, true);
checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter, true);
checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter, true);
checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter, true);
}
/**
* @param ignite Ignite.
* @param name Cache name.
* @param from Start from key.
* @param iter Iteration.
* @param scan If true then "scan" query will be used instead of "get" in a loop. Should be "false" when run in
* parallel with other operations. Otherwise should be "true", because it's much faster in such situations.
*/
protected void checkData(Ignite ignite, String name, int from, int iter, boolean scan) {
IgniteCache<Integer, Integer> cache = ignite.cache(name);
if (scan) {
AtomicInteger cnt = new AtomicInteger();
cache.query(new ScanQuery<Integer, Integer>((k, v) -> k >= from && k < from + TEST_SIZE)).forEach(entry -> {
if (cnt.incrementAndGet() % (TEST_SIZE / 10) == 0)
log.info("<" + name + "> Checked " + cnt.get() * 100 / TEST_SIZE + "% entries. [count=" +
TEST_SIZE + ", iteration=" + iter + ", cache=" + name + "]");
assertEquals("Value does not match [key=" + entry.getKey() + ", cache=" + name + ']',
entry.getKey() + name.hashCode() + iter, entry.getValue().intValue());
});
assertEquals(TEST_SIZE, cnt.get());
}
else {
for (int i = from; i < from + TEST_SIZE; i++) {
if ((i + 1) % (TEST_SIZE / 10) == 0)
log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" +
TEST_SIZE + ", iteration=" + iter + ", cache=" + name + "]");
assertEquals("Value does not match [key=" + i + ", cache=" + name + ']',
cache.get(i).intValue(), i + name.hashCode() + iter);
}
}
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
GridTestUtils.runGC(); // Clean heap before rebalancing.
}
/**
* @throws Exception If failed.
*/
@Test
public void testSimpleRebalancing() throws Exception {
IgniteKernal ignite = (IgniteKernal)startGrid(0);
generateData(ignite, 0, 0);
log.info("Preloading started.");
long start = System.currentTimeMillis();
startGrid(1);
awaitPartitionMapExchange(true, true, null, true);
checkPartitionMapExchangeFinished();
awaitPartitionMessagesAbsent();
stopGrid(0);
awaitPartitionMapExchange(true, true, null, true);
checkPartitionMapExchangeFinished();
awaitPartitionMessagesAbsent();
startGrid(2);
awaitPartitionMapExchange(true, true, null, true);
checkPartitionMapExchangeFinished();
awaitPartitionMessagesAbsent();
stopGrid(2);
awaitPartitionMapExchange(true, true, null, true);
checkPartitionMapExchangeFinished();
awaitPartitionMessagesAbsent();
long spend = (System.currentTimeMillis() - start) / 1000;
checkData(grid(1), 0, 0);
log.info("Spend " + spend + " seconds to rebalance entries.");
}
/**
* @throws Exception If failed.
*/
@Test
public void testLoadRebalancing() throws Exception {
final Ignite ignite = startGrid(0);
startGrid(1);
generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
log.info("Preloading started.");
long start = System.currentTimeMillis();
concurrentStartFinished = false;
Thread t1 = new Thread() {
@Override public void run() {
Random rdm = new Random();
while (!concurrentStartFinished) {
for (int i = 0; i < TEST_SIZE; i++) {
if (i % (TEST_SIZE / 10) == 0)
log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
int ii = rdm.nextInt(TEST_SIZE);
ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(ii, ii + CACHE_NAME_DHT_PARTITIONED.hashCode());
}
}
}
};
Thread t2 = new Thread() {
@Override public void run() {
while (!concurrentStartFinished)
checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0, false);
}
};
t1.start();
t2.start();
startGrid(2);
startGrid(3);
stopGrid(2);
startGrid(4);
awaitPartitionMapExchange(true, true, null);
concurrentStartFinished = true;
checkSupplyContextMapIsEmpty();
t1.join();
t2.join();
long spend = (System.currentTimeMillis() - start) / 1000;
info("Time to rebalance entries: " + spend);
}
/**
* @throws Exception If failed.
*/
protected void checkSupplyContextMapIsEmpty() throws Exception {
for (Ignite g : G.allGrids()) {
for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
Object supplier = U.field(c.preloader(), "supplier");
final Map map = U.field(supplier, "scMap");
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
synchronized (map) {
return map.isEmpty();
}
}
}, 15_000);
synchronized (map) {
assertTrue("Map is not empty [cache=" + c.name() +
", node=" + g.name() +
", map=" + map + ']', map.isEmpty());
}
}
}
}
/**
*
*/
public static void checkPartitionMapExchangeFinished() {
for (Ignite g : G.allGrids()) {
IgniteKernal g0 = (IgniteKernal)g;
for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) {
CacheConfiguration cfg = c.context().config();
if (cfg.getRebalanceMode() != NONE) {
GridDhtCacheAdapter<?, ?> dht = dht(c);
GridDhtPartitionTopology top = dht.topology();
List<GridDhtLocalPartition> locs = top.localPartitions();
for (GridDhtLocalPartition loc : locs) {
GridDhtPartitionState actl = loc.state();
boolean res = GridDhtPartitionState.OWNING.equals(actl);
if (!res)
printPartitionState(c);
assertTrue("Wrong local partition state part=" +
loc.id() + ", should be OWNING [state=" + actl +
"], node=" + g0.name() + " cache=" + c.getName(), res);
Collection<ClusterNode> affNodes =
g0.affinity(cfg.getName()).mapPartitionToPrimaryAndBackups(loc.id());
assertTrue(affNodes.contains(g0.localNode()));
}
for (Ignite remote : G.allGrids()) {
IgniteKernal remote0 = (IgniteKernal)remote;
IgniteCacheProxy<?, ?> remoteC = remote0.context().cache().jcache(cfg.getName());
GridDhtCacheAdapter<?, ?> remoteDht = dht(remoteC);
GridDhtPartitionTopology remoteTop = remoteDht.topology();
GridDhtPartitionMap pMap = remoteTop.partitionMap(true).get(((IgniteKernal)g).localNodeId());
assertEquals(pMap.size(), locs.size());
for (Map.Entry entry : pMap.entrySet()) {
assertTrue("Wrong remote partition state part=" + entry.getKey() +
", should be OWNING [state=" + entry.getValue() + "], node="
+ remote.name() + " cache=" + c.getName(),
entry.getValue() == GridDhtPartitionState.OWNING);
}
for (GridDhtLocalPartition loc : locs)
assertTrue(pMap.containsKey(loc.id()));
}
}
}
}
log.info("checkPartitionMapExchangeFinished finished");
}
/**
* Method checks for {@link GridDhtPartitionsSingleMessage} or {@link GridDhtPartitionsFullMessage}
* not received within {@code TOPOLOGY_STILLNESS_TIME} bound.
*
* @throws Exception If failed.
*/
protected void awaitPartitionMessagesAbsent() throws Exception {
log.info("Checking GridDhtPartitions*Message absent (it will take up to " +
TOPOLOGY_STILLNESS_TIME + " ms) ... ");
// Start waiting new messages from current point of time.
lastPartMsgTime = U.currentTimeMillis();
assertTrue("Should not have partition Single or Full messages within bound " +
TOPOLOGY_STILLNESS_TIME + " ms.",
GridTestUtils.waitForCondition(
new GridAbsPredicateX() {
@Override public boolean applyx() {
return lastPartMsgTime + TOPOLOGY_STILLNESS_TIME < U.currentTimeMillis();
}
},
2 * TOPOLOGY_STILLNESS_TIME // 30 sec to gain stable topology and 30 sec of silence.
)
);
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 10 * 60_000;
}
/**
* @throws Exception If failed.
*/
@Test
public void testComplexRebalancing() throws Exception {
final Ignite ignite = startGrid(0);
generateData(ignite, 0, 0);
log.info("Preloading started.");
long start = System.currentTimeMillis();
concurrentStartFinished = false;
concurrentStartFinished2 = false;
concurrentStartFinished3 = false;
Thread t1 = new Thread() {
@Override public void run() {
try {
startGrid(1);
startGrid(2);
while (!concurrentStartFinished2)
U.sleep(10);
awaitPartitionMapExchange();
//New cache should start rebalancing.
CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cacheRCfg.setRebalanceBatchesPrefetchCount(1);
grid(0).getOrCreateCache(cacheRCfg);
while (!concurrentStartFinished3)
U.sleep(10);
concurrentStartFinished = true;
}
catch (Exception e) {
e.printStackTrace();
}
}
};
Thread t2 = new Thread() {
@Override public void run() {
try {
startGrid(3);
startGrid(4);
concurrentStartFinished2 = true;
}
catch (Exception e) {
e.printStackTrace();
}
}
};
Thread t3 = new Thread() {
@Override public void run() {
generateData(ignite, 0, 1);
concurrentStartFinished3 = true;
}
};
t1.start();
t2.start(); // Should cancel t1 rebalancing.
t3.start();
t1.join();
t2.join();
t3.join();
awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
checkData(grid(4), 0, 1);
final Ignite ignite3 = grid(3);
Thread t4 = new Thread() {
@Override public void run() {
generateData(ignite3, 0, 2);
}
};
t4.start();
stopGrid(1);
awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
stopGrid(0);
awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
stopGrid(2);
awaitPartitionMapExchange(true, true, null);
checkPartitionMapExchangeFinished();
awaitPartitionMessagesAbsent();
checkSupplyContextMapIsEmpty();
t4.join();
stopGrid(3);
awaitPartitionMapExchange();
checkSupplyContextMapIsEmpty();
long spend = (System.currentTimeMillis() - start) / 1000;
checkData(grid(4), 0, 2);
log.info("Spend " + spend + " seconds to rebalance entries.");
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/**
*
*/
private static class CollectingCommunicationSpi extends TcpCommunicationSpi {
/** */
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@Override public void sendMessage(final ClusterNode node, final Message msg,
final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
final Object msg0 = ((GridIoMessage)msg).message();
if (msg0 instanceof GridDhtPartitionsSingleMessage ||
msg0 instanceof GridDhtPartitionsFullMessage) {
lastPartMsgTime = U.currentTimeMillis();
log.info("Last seen time of GridDhtPartitionsSingleMessage or GridDhtPartitionsFullMessage updated " +
"[lastPartMsgTime=" + lastPartMsgTime +
", node=" + node.id() + ']');
}
super.sendMessage(node, msg, ackC);
}
}
/** {@inheritDoc} */
@Override protected long getPartitionMapExchangeTimeout() {
return super.getPartitionMapExchangeTimeout() * 2;
}
}