| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.rebalancing; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CacheRebalanceMode; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cache.query.ScanQuery; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.managers.communication.GridIoMessage; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicateX; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.GridTestUtils.SF; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| import static org.apache.ignite.cache.CacheRebalanceMode.NONE; |
| |
| /** |
| * |
| */ |
| public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static final int TEST_SIZE = SF.applyLB(100_000, 10_000); |
| |
| /** */ |
| private static final long TOPOLOGY_STILLNESS_TIME = SF.applyLB(30_000, 5_000); |
| |
| /** partitioned cache name. */ |
| protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP"; |
| |
| /** partitioned cache 2 name. */ |
| protected static final String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2"; |
| |
| /** replicated cache name. */ |
| protected static final String CACHE_NAME_DHT_REPLICATED = "cacheR"; |
| |
| /** replicated cache 2 name. */ |
| protected static final String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2"; |
| |
| /** */ |
| private volatile boolean concurrentStartFinished; |
| |
| /** */ |
| private volatile boolean concurrentStartFinished2; |
| |
| /** */ |
| private volatile boolean concurrentStartFinished3; |
| |
| /** |
| * Time in milliseconds of last received {@link GridDhtPartitionsSingleMessage} |
| * or {@link GridDhtPartitionsFullMessage} using {@link CollectingCommunicationSpi}. |
| */ |
| private static volatile long lastPartMsgTime; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName); |
| |
| if (MvccFeatureChecker.forcedMvcc()) { |
| iCfg.setDataStorageConfiguration(new DataStorageConfiguration() |
| .setDefaultDataRegionConfiguration( |
| new DataRegionConfiguration().setMaxSize(400L * 1024 * 1024) |
| )); |
| } |
| |
| TcpCommunicationSpi commSpi = new CollectingCommunicationSpi(); |
| commSpi.setTcpNoDelay(true); |
| |
| iCfg.setCommunicationSpi(commSpi); |
| |
| CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); |
| |
| cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED); |
| cachePCfg.setCacheMode(CacheMode.PARTITIONED); |
| cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC); |
| cachePCfg.setBackups(1); |
| cachePCfg.setRebalanceBatchSize(1); |
| cachePCfg.setRebalanceBatchesPrefetchCount(1); |
| cachePCfg.setRebalanceOrder(2); |
| cachePCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); |
| cachePCfg.setAffinity(new RendezvousAffinityFunction().setPartitions(32)); |
| |
| CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(DEFAULT_CACHE_NAME); |
| |
| cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2); |
| cachePCfg2.setCacheMode(CacheMode.PARTITIONED); |
| cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); |
| cachePCfg2.setBackups(1); |
| cachePCfg2.setRebalanceOrder(2); |
| cachePCfg2.setRebalanceDelay(SF.applyLB(5000, 500)); |
| cachePCfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); |
| cachePCfg2.setAffinity(new RendezvousAffinityFunction().setPartitions(32)); |
| |
| CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); |
| |
| cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED); |
| cacheRCfg.setCacheMode(CacheMode.REPLICATED); |
| cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); |
| cacheRCfg.setRebalanceBatchSize(1); |
| cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE); |
| cacheRCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); |
| cacheRCfg.setAffinity(new RendezvousAffinityFunction().setPartitions(32)); |
| |
| CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(DEFAULT_CACHE_NAME); |
| |
| cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2); |
| cacheRCfg2.setCacheMode(CacheMode.REPLICATED); |
| cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); |
| cacheRCfg2.setRebalanceOrder(4); |
| cacheRCfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); |
| cacheRCfg2.setAffinity(new RendezvousAffinityFunction().setPartitions(32)); |
| |
| iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2); |
| |
| iCfg.setRebalanceThreadPoolSize(3); |
| |
| return iCfg; |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param from Start from key. |
| * @param iter Iteration. |
| */ |
| protected void generateData(Ignite ignite, int from, int iter) { |
| generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); |
| generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter); |
| generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter); |
| generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter); |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param name Cache name. |
| * @param from Start from key. |
| * @param iter Iteration. |
| */ |
| protected void generateData(Ignite ignite, String name, int from, int iter) { |
| try (IgniteDataStreamer<Integer, Integer> dataStreamer = ignite.dataStreamer(name)) { |
| dataStreamer.allowOverwrite(true); |
| |
| for (int i = from; i < from + TEST_SIZE; i++) { |
| if ((i + 1) % (TEST_SIZE / 10) == 0) |
| log.info("Prepared " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE + |
| ", iteration=" + iter + ", cache=" + name + "]"); |
| |
| dataStreamer.addData(i, i + name.hashCode() + iter); |
| } |
| } |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param from Start from key. |
| * @param iter Iteration. |
| */ |
| protected void checkData(Ignite ignite, int from, int iter) { |
| checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter, true); |
| checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter, true); |
| checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter, true); |
| checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter, true); |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param name Cache name. |
| * @param from Start from key. |
| * @param iter Iteration. |
| * @param scan If true then "scan" query will be used instead of "get" in a loop. Should be "false" when run in |
| * parallel with other operations. Otherwise should be "true", because it's much faster in such situations. |
| */ |
| protected void checkData(Ignite ignite, String name, int from, int iter, boolean scan) { |
| IgniteCache<Integer, Integer> cache = ignite.cache(name); |
| |
| if (scan) { |
| AtomicInteger cnt = new AtomicInteger(); |
| |
| cache.query(new ScanQuery<Integer, Integer>((k, v) -> k >= from && k < from + TEST_SIZE)).forEach(entry -> { |
| if (cnt.incrementAndGet() % (TEST_SIZE / 10) == 0) |
| log.info("<" + name + "> Checked " + cnt.get() * 100 / TEST_SIZE + "% entries. [count=" + |
| TEST_SIZE + ", iteration=" + iter + ", cache=" + name + "]"); |
| |
| assertEquals("Value does not match [key=" + entry.getKey() + ", cache=" + name + ']', |
| entry.getKey() + name.hashCode() + iter, entry.getValue().intValue()); |
| }); |
| |
| assertEquals(TEST_SIZE, cnt.get()); |
| } |
| else { |
| for (int i = from; i < from + TEST_SIZE; i++) { |
| if ((i + 1) % (TEST_SIZE / 10) == 0) |
| log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + |
| TEST_SIZE + ", iteration=" + iter + ", cache=" + name + "]"); |
| |
| assertEquals("Value does not match [key=" + i + ", cache=" + name + ']', |
| cache.get(i).intValue(), i + name.hashCode() + iter); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| GridTestUtils.runGC(); // Clean heap before rebalancing. |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSimpleRebalancing() throws Exception { |
| IgniteKernal ignite = (IgniteKernal)startGrid(0); |
| |
| generateData(ignite, 0, 0); |
| |
| log.info("Preloading started."); |
| |
| long start = System.currentTimeMillis(); |
| |
| startGrid(1); |
| |
| awaitPartitionMapExchange(true, true, null, true); |
| |
| checkPartitionMapExchangeFinished(); |
| |
| awaitPartitionMessagesAbsent(); |
| |
| stopGrid(0); |
| |
| awaitPartitionMapExchange(true, true, null, true); |
| |
| checkPartitionMapExchangeFinished(); |
| |
| awaitPartitionMessagesAbsent(); |
| |
| startGrid(2); |
| |
| awaitPartitionMapExchange(true, true, null, true); |
| |
| checkPartitionMapExchangeFinished(); |
| |
| awaitPartitionMessagesAbsent(); |
| |
| stopGrid(2); |
| |
| awaitPartitionMapExchange(true, true, null, true); |
| |
| checkPartitionMapExchangeFinished(); |
| |
| awaitPartitionMessagesAbsent(); |
| |
| long spend = (System.currentTimeMillis() - start) / 1000; |
| |
| checkData(grid(1), 0, 0); |
| |
| log.info("Spend " + spend + " seconds to rebalance entries."); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadRebalancing() throws Exception { |
| final Ignite ignite = startGrid(0); |
| |
| startGrid(1); |
| |
| generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0); |
| |
| log.info("Preloading started."); |
| |
| long start = System.currentTimeMillis(); |
| |
| concurrentStartFinished = false; |
| |
| Thread t1 = new Thread() { |
| @Override public void run() { |
| Random rdm = new Random(); |
| |
| while (!concurrentStartFinished) { |
| for (int i = 0; i < TEST_SIZE; i++) { |
| if (i % (TEST_SIZE / 10) == 0) |
| log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); |
| |
| int ii = rdm.nextInt(TEST_SIZE); |
| |
| ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(ii, ii + CACHE_NAME_DHT_PARTITIONED.hashCode()); |
| } |
| } |
| } |
| }; |
| |
| Thread t2 = new Thread() { |
| @Override public void run() { |
| while (!concurrentStartFinished) |
| checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0, false); |
| } |
| }; |
| |
| t1.start(); |
| t2.start(); |
| |
| startGrid(2); |
| startGrid(3); |
| |
| stopGrid(2); |
| |
| startGrid(4); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| concurrentStartFinished = true; |
| |
| checkSupplyContextMapIsEmpty(); |
| |
| t1.join(); |
| t2.join(); |
| |
| long spend = (System.currentTimeMillis() - start) / 1000; |
| |
| info("Time to rebalance entries: " + spend); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| protected void checkSupplyContextMapIsEmpty() throws Exception { |
| for (Ignite g : G.allGrids()) { |
| for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) { |
| Object supplier = U.field(c.preloader(), "supplier"); |
| |
| final Map map = U.field(supplier, "scMap"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| synchronized (map) { |
| return map.isEmpty(); |
| } |
| } |
| }, 15_000); |
| |
| synchronized (map) { |
| assertTrue("Map is not empty [cache=" + c.name() + |
| ", node=" + g.name() + |
| ", map=" + map + ']', map.isEmpty()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static void checkPartitionMapExchangeFinished() { |
| for (Ignite g : G.allGrids()) { |
| IgniteKernal g0 = (IgniteKernal)g; |
| |
| for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) { |
| CacheConfiguration cfg = c.context().config(); |
| |
| if (cfg.getRebalanceMode() != NONE) { |
| GridDhtCacheAdapter<?, ?> dht = dht(c); |
| |
| GridDhtPartitionTopology top = dht.topology(); |
| |
| List<GridDhtLocalPartition> locs = top.localPartitions(); |
| |
| for (GridDhtLocalPartition loc : locs) { |
| GridDhtPartitionState actl = loc.state(); |
| |
| boolean res = GridDhtPartitionState.OWNING.equals(actl); |
| |
| if (!res) |
| printPartitionState(c); |
| |
| assertTrue("Wrong local partition state part=" + |
| loc.id() + ", should be OWNING [state=" + actl + |
| "], node=" + g0.name() + " cache=" + c.getName(), res); |
| |
| Collection<ClusterNode> affNodes = |
| g0.affinity(cfg.getName()).mapPartitionToPrimaryAndBackups(loc.id()); |
| |
| assertTrue(affNodes.contains(g0.localNode())); |
| } |
| |
| for (Ignite remote : G.allGrids()) { |
| IgniteKernal remote0 = (IgniteKernal)remote; |
| |
| IgniteCacheProxy<?, ?> remoteC = remote0.context().cache().jcache(cfg.getName()); |
| |
| GridDhtCacheAdapter<?, ?> remoteDht = dht(remoteC); |
| |
| GridDhtPartitionTopology remoteTop = remoteDht.topology(); |
| |
| GridDhtPartitionMap pMap = remoteTop.partitionMap(true).get(((IgniteKernal)g).localNodeId()); |
| |
| assertEquals(pMap.size(), locs.size()); |
| |
| for (Map.Entry entry : pMap.entrySet()) { |
| assertTrue("Wrong remote partition state part=" + entry.getKey() + |
| ", should be OWNING [state=" + entry.getValue() + "], node=" |
| + remote.name() + " cache=" + c.getName(), |
| entry.getValue() == GridDhtPartitionState.OWNING); |
| } |
| |
| for (GridDhtLocalPartition loc : locs) |
| assertTrue(pMap.containsKey(loc.id())); |
| } |
| } |
| } |
| } |
| |
| log.info("checkPartitionMapExchangeFinished finished"); |
| } |
| |
| /** |
| * Method checks for {@link GridDhtPartitionsSingleMessage} or {@link GridDhtPartitionsFullMessage} |
| * not received within {@code TOPOLOGY_STILLNESS_TIME} bound. |
| * |
| * @throws Exception If failed. |
| */ |
| protected void awaitPartitionMessagesAbsent() throws Exception { |
| log.info("Checking GridDhtPartitions*Message absent (it will take up to " + |
| TOPOLOGY_STILLNESS_TIME + " ms) ... "); |
| |
| // Start waiting new messages from current point of time. |
| lastPartMsgTime = U.currentTimeMillis(); |
| |
| assertTrue("Should not have partition Single or Full messages within bound " + |
| TOPOLOGY_STILLNESS_TIME + " ms.", |
| GridTestUtils.waitForCondition( |
| new GridAbsPredicateX() { |
| @Override public boolean applyx() { |
| return lastPartMsgTime + TOPOLOGY_STILLNESS_TIME < U.currentTimeMillis(); |
| } |
| }, |
| 2 * TOPOLOGY_STILLNESS_TIME // 30 sec to gain stable topology and 30 sec of silence. |
| ) |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return 10 * 60_000; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testComplexRebalancing() throws Exception { |
| final Ignite ignite = startGrid(0); |
| |
| generateData(ignite, 0, 0); |
| |
| log.info("Preloading started."); |
| |
| long start = System.currentTimeMillis(); |
| |
| concurrentStartFinished = false; |
| concurrentStartFinished2 = false; |
| concurrentStartFinished3 = false; |
| |
| Thread t1 = new Thread() { |
| @Override public void run() { |
| try { |
| startGrid(1); |
| startGrid(2); |
| |
| while (!concurrentStartFinished2) |
| U.sleep(10); |
| |
| awaitPartitionMapExchange(); |
| |
| //New cache should start rebalancing. |
| CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); |
| |
| cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW"); |
| cacheRCfg.setCacheMode(CacheMode.PARTITIONED); |
| cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); |
| cacheRCfg.setRebalanceBatchesPrefetchCount(1); |
| |
| grid(0).getOrCreateCache(cacheRCfg); |
| |
| while (!concurrentStartFinished3) |
| U.sleep(10); |
| |
| concurrentStartFinished = true; |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| }; |
| |
| Thread t2 = new Thread() { |
| @Override public void run() { |
| try { |
| startGrid(3); |
| startGrid(4); |
| |
| concurrentStartFinished2 = true; |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| }; |
| |
| Thread t3 = new Thread() { |
| @Override public void run() { |
| generateData(ignite, 0, 1); |
| |
| concurrentStartFinished3 = true; |
| } |
| }; |
| |
| t1.start(); |
| t2.start(); // Should cancel t1 rebalancing. |
| t3.start(); |
| |
| t1.join(); |
| t2.join(); |
| t3.join(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkSupplyContextMapIsEmpty(); |
| |
| checkData(grid(4), 0, 1); |
| |
| final Ignite ignite3 = grid(3); |
| |
| Thread t4 = new Thread() { |
| @Override public void run() { |
| generateData(ignite3, 0, 2); |
| |
| } |
| }; |
| |
| t4.start(); |
| |
| stopGrid(1); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkSupplyContextMapIsEmpty(); |
| |
| stopGrid(0); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkSupplyContextMapIsEmpty(); |
| |
| stopGrid(2); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionMapExchangeFinished(); |
| |
| awaitPartitionMessagesAbsent(); |
| |
| checkSupplyContextMapIsEmpty(); |
| |
| t4.join(); |
| |
| stopGrid(3); |
| |
| awaitPartitionMapExchange(); |
| |
| checkSupplyContextMapIsEmpty(); |
| |
| long spend = (System.currentTimeMillis() - start) / 1000; |
| |
| checkData(grid(4), 0, 2); |
| |
| log.info("Spend " + spend + " seconds to rebalance entries."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| super.afterTest(); |
| |
| stopAllGrids(); |
| } |
| |
| /** |
| * |
| */ |
| private static class CollectingCommunicationSpi extends TcpCommunicationSpi { |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** {@inheritDoc} */ |
| @Override public void sendMessage(final ClusterNode node, final Message msg, |
| final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { |
| final Object msg0 = ((GridIoMessage)msg).message(); |
| |
| if (msg0 instanceof GridDhtPartitionsSingleMessage || |
| msg0 instanceof GridDhtPartitionsFullMessage) { |
| lastPartMsgTime = U.currentTimeMillis(); |
| |
| log.info("Last seen time of GridDhtPartitionsSingleMessage or GridDhtPartitionsFullMessage updated " + |
| "[lastPartMsgTime=" + lastPartMsgTime + |
| ", node=" + node.id() + ']'); |
| } |
| |
| super.sendMessage(node, msg, ackC); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getPartitionMapExchangeTimeout() { |
| return super.getPartitionMapExchangeTimeout() * 2; |
| } |
| } |