| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.ToLongFunction; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheMetrics; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CacheRebalanceMode; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.TestRecordingCommunicationSpi; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; |
| import org.apache.ignite.internal.processors.metric.MetricRegistry; |
| import org.apache.ignite.internal.processors.metric.impl.ObjectGauge; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.visor.VisorTaskArgument; |
| import org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTask; |
| import org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTaskArg; |
| import org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTaskResult; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.spi.metric.IntMetric; |
| import org.apache.ignite.spi.metric.LongMetric; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL; |
| import static org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction.DFLT_PARTITION_COUNT; |
| import static org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX; |
| import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; |
| import static org.apache.ignite.testframework.GridTestUtils.runAsync; |
| import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; |
| |
| /** |
| * |
| */ |
| public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { |
| /** */ |
| private static final String CACHE1 = "cache1"; |
| |
| /** */ |
| private static final String CACHE2 = "cache2"; |
| |
| /** */ |
| private static final String CACHE3 = "cache3"; |
| |
| /** */ |
| private static final String CACHE4 = "cache4"; |
| |
| /** */ |
| private static final String CACHE5 = "cache5"; |
| |
| /** */ |
| private static final long REBALANCE_DELAY = 5_000; |
| |
| /** */ |
| private static final String GROUP = "group1"; |
| |
| /** */ |
| private static final String GROUP2 = "group2"; |
| |
| /** */ |
| private static final int KEYS_COUNT = 10_000; |
| |
| /** Acceptable time inaccuracy for testRebalanceEstimateFinishTime() */ |
| public static final long ACCEPTABLE_TIME_INACCURACY = 25_000L; |
| |
| /** */ |
| private long rebalanceDelay; |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| super.afterTest(); |
| |
| stopAllGrids(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return 10 * 60 * 1000; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| CacheConfiguration cfg1 = new CacheConfiguration() |
| .setName(CACHE1) |
| .setGroupName(GROUP) |
| .setCacheMode(CacheMode.PARTITIONED) |
| .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) |
| .setRebalanceMode(CacheRebalanceMode.ASYNC) |
| .setRebalanceBatchSize(100) |
| .setStatisticsEnabled(true); |
| |
| CacheConfiguration cfg2 = new CacheConfiguration(cfg1) |
| .setName(CACHE2); |
| |
| CacheConfiguration cfg3 = new CacheConfiguration() |
| .setName(CACHE3) |
| .setCacheMode(CacheMode.PARTITIONED) |
| .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) |
| .setRebalanceMode(CacheRebalanceMode.ASYNC) |
| .setRebalanceBatchSize(100) |
| .setStatisticsEnabled(true) |
| .setRebalanceDelay(rebalanceDelay); |
| |
| CacheConfiguration cfg4 = new CacheConfiguration() |
| .setAffinity(new RendezvousAffinityFunction()) |
| .setRebalanceMode(CacheRebalanceMode.ASYNC) |
| .setName(CACHE4) |
| .setCacheMode(CacheMode.REPLICATED) |
| .setGroupName(GROUP2); |
| |
| CacheConfiguration cfg5 = new CacheConfiguration(cfg4) |
| .setName(CACHE5); |
| |
| cfg.setCacheConfiguration(cfg1, cfg2, cfg3, cfg4, cfg5); |
| |
| cfg.setIncludeEventTypes(EventType.EVTS_ALL); |
| |
| cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| rebalanceDelay = 0; |
| } |
| |
| /** |
| * Checks the correctness of {@link CacheMetrics#getRebalancingKeysRate}. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRebalance() throws Exception { |
| Ignite ignite = startGrid(0); |
| |
| IgniteCache<Object, Object> cache1 = ignite.cache(CACHE1); |
| IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2); |
| |
| for (int i = 0; i < KEYS_COUNT; i++) { |
| cache1.put(i, CACHE1 + "-" + i); |
| |
| if (i % 2 == 0) |
| cache2.put(i, CACHE2 + "-" + i); |
| } |
| |
| ignite = startGrid(1); |
| |
| awaitPartitionMapExchange(true, true, null, true); |
| |
| CacheMetrics metrics1 = ignite.cache(CACHE1).localMetrics(); |
| CacheMetrics metrics2 = ignite.cache(CACHE2).localMetrics(); |
| |
| long rate1 = metrics1.getRebalancingKeysRate(); |
| long rate2 = metrics2.getRebalancingKeysRate(); |
| |
| assertTrue(rate1 > 0); |
| assertTrue(rate2 > 0); |
| assertTrue(rate1 > rate2); |
| |
| assertEquals(metrics1.getRebalancedKeys(), rate1); |
| assertEquals(metrics2.getRebalancedKeys(), rate2); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheGroupRebalance() throws Exception { |
| IgniteEx ignite0 = startGrid(0); |
| |
| List<String> cacheNames = Arrays.asList(CACHE4, CACHE5); |
| |
| int allKeysCount = 0; |
| |
| for (String cacheName : cacheNames) { |
| Map<Integer, Long> data = new Random().ints(KEYS_COUNT).distinct().boxed() |
| .collect(Collectors.toMap(i -> i, i -> (long)i)); |
| |
| ignite0.getOrCreateCache(cacheName).putAll(data); |
| |
| allKeysCount += data.size(); |
| } |
| |
| TestRecordingCommunicationSpi.spi(ignite0) |
| .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { |
| @Override public boolean apply(ClusterNode node, Message msg) { |
| return (msg instanceof GridDhtPartitionSupplyMessage) && |
| CU.cacheId(GROUP2) == ((GridCacheGroupIdMessage)msg).groupId(); |
| } |
| }); |
| |
| IgniteEx ignite1 = startGrid(1); |
| |
| TestRecordingCommunicationSpi.spi(ignite0).waitForBlocked(); |
| |
| MetricRegistry mreg = ignite1.context().metric() |
| .registry(metricName(CACHE_GROUP_METRICS_PREFIX, GROUP2)); |
| |
| LongMetric startTime = mreg.findMetric("RebalancingStartTime"); |
| LongMetric lastCancelledTime = mreg.findMetric("RebalancingLastCancelledTime"); |
| LongMetric endTime = mreg.findMetric("RebalancingEndTime"); |
| LongMetric partitionsLeft = mreg.findMetric("RebalancingPartitionsLeft"); |
| IntMetric partitionsTotal = mreg.findMetric("RebalancingPartitionsTotal"); |
| LongMetric receivedKeys = mreg.findMetric("RebalancingReceivedKeys"); |
| LongMetric receivedBytes = mreg.findMetric("RebalancingReceivedBytes"); |
| |
| ObjectGauge<Map<UUID, Long>> fullReceivedKeys = mreg.findMetric("RebalancingFullReceivedKeys"); |
| ObjectGauge<Map<UUID, Long>> histReceivedKeys = mreg.findMetric("RebalancingHistReceivedKeys"); |
| ObjectGauge<Map<UUID, Long>> fullReceivedBytes = mreg.findMetric("RebalancingFullReceivedBytes"); |
| ObjectGauge<Map<UUID, Long>> histReceivedBytes = mreg.findMetric("RebalancingHistReceivedBytes"); |
| |
| assertEquals("During the start of the rebalancing, the number of partitions in the metric should be " + |
| "equal to the number of partitions in the cache group.", DFLT_PARTITION_COUNT, partitionsLeft.value()); |
| |
| assertEquals("The total number of partitions in the metric should be " + |
| "equal to the number of partitions in the cache group.", DFLT_PARTITION_COUNT, partitionsTotal.value()); |
| |
| long rebalancingStartTime = startTime.value(); |
| |
| assertNotSame("During rebalancing start, the start time metric must be determined.", |
| -1, startTime.value()); |
| |
| assertEquals("Rebalancing last cancelled time must be undefined.", -1, lastCancelledTime.value()); |
| |
| assertEquals("Before the rebalancing is completed, the end time metric must be undefined.", |
| -1, endTime.value()); |
| |
| ToLongFunction<Map<UUID, Long>> sumFunc = map -> map.values().stream().mapToLong(Long::longValue).sum(); |
| |
| String zeroReceivedKeysMsg = "Until a partition supply message has been delivered, keys cannot be received."; |
| assertEquals(zeroReceivedKeysMsg, 0, receivedKeys.value()); |
| assertEquals(zeroReceivedKeysMsg, 0, sumFunc.applyAsLong(fullReceivedKeys.value())); |
| assertEquals(zeroReceivedKeysMsg, 0, sumFunc.applyAsLong(histReceivedKeys.value())); |
| |
| String zeroReceivedBytesMsg = "Until a partition supply message has been delivered, bytes cannot be received."; |
| assertEquals(zeroReceivedBytesMsg, 0, receivedBytes.value()); |
| assertEquals(zeroReceivedBytesMsg, 0, sumFunc.applyAsLong(fullReceivedBytes.value())); |
| assertEquals(zeroReceivedBytesMsg, 0, sumFunc.applyAsLong(histReceivedBytes.value())); |
| |
| checkSuppliers( |
| Arrays.asList(ignite0.localNode().id()), |
| fullReceivedKeys, histReceivedKeys, fullReceivedBytes, histReceivedBytes |
| ); |
| |
| TestRecordingCommunicationSpi.spi(ignite0).stopBlock(); |
| |
| for (String cacheName : cacheNames) |
| ignite1.context().cache().internalCache(cacheName).preloader().rebalanceFuture().get(); |
| |
| assertEquals("After completion of rebalancing, there are no partitions of the cache group that are" + |
| " left to rebalance.", 0, partitionsLeft.value()); |
| |
| assertEquals("After completion of rebalancing, the total number of partitions in the metric should be" + |
| " equal to the number of partitions in the cache group.", DFLT_PARTITION_COUNT, partitionsTotal.value()); |
| |
| assertEquals("After the rebalancing is ended, the rebalancing start time must be equal to the start time " + |
| "measured immediately after the rebalancing start.", rebalancingStartTime, startTime.value()); |
| |
| assertEquals("Rebalancing last cancelled time must be undefined.", -1, lastCancelledTime.value()); |
| |
| waitForCondition(() -> endTime.value() != -1, 1000); |
| |
| assertTrue("Rebalancing end time must be determined and must be longer than the start time " + |
| "[RebalancingStartTime=" + rebalancingStartTime + ", RebalancingEndTime=" + endTime.value() + "].", |
| rebalancingStartTime < endTime.value()); |
| |
| String wrongReceivedKeyCntMsg = "The number of currently rebalanced keys for the whole cache group should " + |
| "be equal to the number of entries in the caches."; |
| assertEquals(wrongReceivedKeyCntMsg, allKeysCount, receivedKeys.value()); |
| assertEquals(wrongReceivedKeyCntMsg, allKeysCount, sumFunc.applyAsLong(fullReceivedKeys.value())); |
| assertEquals(0, sumFunc.applyAsLong(histReceivedKeys.value())); |
| |
| int estimateByteCnt = allKeysCount * (Integer.BYTES + Long.BYTES); |
| |
| String wrongReceivedByteCntMsg = "The number of currently rebalanced bytes of this cache group was expected " + |
| "more " + estimateByteCnt + " bytes."; |
| assertTrue(wrongReceivedByteCntMsg, receivedBytes.value() > estimateByteCnt); |
| assertTrue(wrongReceivedByteCntMsg, sumFunc.applyAsLong(fullReceivedBytes.value()) > estimateByteCnt); |
| assertEquals(0, sumFunc.applyAsLong(histReceivedBytes.value())); |
| |
| checkSuppliers( |
| Arrays.asList(ignite0.localNode().id()), |
| fullReceivedKeys, histReceivedKeys, fullReceivedBytes, histReceivedBytes |
| ); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRebalancingLastCancelledTime() throws Exception { |
| rebalanceDelay = REBALANCE_DELAY; // Used for trigger rebalance cancellation. |
| |
| IgniteEx ignite0 = startGrid(0); |
| |
| List<String> cacheNames = Arrays.asList(CACHE4, CACHE5); |
| |
| for (String cacheName : cacheNames) { |
| ignite0.getOrCreateCache(cacheName).putAll(new Random().ints(KEYS_COUNT).distinct().boxed() |
| .collect(Collectors.toMap(i -> i, i -> (long)i))); |
| } |
| |
| TestRecordingCommunicationSpi.spi(ignite0) |
| .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { |
| @Override public boolean apply(ClusterNode node, Message msg) { |
| return (msg instanceof GridDhtPartitionSupplyMessage) && |
| ((GridCacheGroupIdMessage)msg).groupId() == CU.cacheId(GROUP2); |
| } |
| }); |
| |
| IgniteEx ignite1 = startGrid(1); |
| |
| TestRecordingCommunicationSpi.spi(ignite0).waitForBlocked(); |
| |
| MetricRegistry mreg = ignite1.context().metric().registry(metricName(CACHE_GROUP_METRICS_PREFIX, GROUP2)); |
| |
| LongMetric startTime = mreg.findMetric("RebalancingStartTime"); |
| LongMetric lastCancelledTime = mreg.findMetric("RebalancingLastCancelledTime"); |
| LongMetric endTime = mreg.findMetric("RebalancingEndTime"); |
| LongMetric partitionsLeft = mreg.findMetric("RebalancingPartitionsLeft"); |
| IntMetric partitionsTotal = mreg.findMetric("RebalancingPartitionsTotal"); |
| |
| assertEquals("During the start of the rebalancing, the number of partitions in the metric should be " + |
| "equal to the number of partitions in the cache group.", DFLT_PARTITION_COUNT, partitionsLeft.value()); |
| |
| assertEquals("The total number of partitions in the metric should be " + |
| "equal to the number of partitions in the cache group.", DFLT_PARTITION_COUNT, partitionsTotal.value()); |
| |
| long rebalancingStartTime = startTime.value(); |
| |
| assertNotSame("During rebalancing start, the start time metric must be determined.", |
| -1, startTime.value()); |
| |
| assertEquals("Rebalancing last cancelled time must be undefined.", -1, lastCancelledTime.value()); |
| |
| assertEquals("Before the rebalancing is completed, the end time metric must be undefined.", |
| -1, endTime.value()); |
| |
| IgniteInternalFuture chain = ignite1.context().cache().internalCache(CACHE5).preloader().rebalanceFuture() |
| .chain(f -> { |
| assertEquals("After the rebalancing is ended, the rebalancing start time must be equal to " + |
| "the start time measured immediately after the rebalancing start.", |
| rebalancingStartTime, startTime.value()); |
| |
| assertEquals("If the rebalancing has been cancelled, the end time must not be set.", |
| -1, endTime.value()); |
| |
| return null; |
| }); |
| |
| TestRecordingCommunicationSpi.spi(ignite0).stopBlock(false); |
| |
| chain.get(); |
| |
| assertNotSame("The rebalancing start time must not be equal to the previously measured start time, since" + |
| " the first rebalancing was cancelled and restarted.", rebalancingStartTime, startTime.value()); |
| |
| waitForCondition(() -> lastCancelledTime.value() != -1, 5000); |
| |
| assertTrue("The rebalancing last cancelled time must be greater than or equal to the start time of the " + |
| "cancelled rebalancing [RebalancingStartTime=" + rebalancingStartTime + ", rebalancingLastCancelledTime=" + |
| lastCancelledTime.value() + "].", rebalancingStartTime <= lastCancelledTime.value()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRebalanceProgressUnderLoad() throws Exception { |
| Ignite ignite = startGrids(4); |
| |
| IgniteCache<Object, Object> cache1 = ignite.cache(CACHE1); |
| |
| Random r = new Random(); |
| |
| GridTestUtils.runAsync(new Runnable() { |
| @Override public void run() { |
| for (int i = 0; i < 100_000; i++) { |
| int next = r.nextInt(); |
| |
| cache1.put(next, CACHE1 + "-" + next); |
| } |
| } |
| }); |
| |
| IgniteEx ig = startGrid(4); |
| |
| GridTestUtils.runAsync(new Runnable() { |
| @Override public void run() { |
| for (int i = 0; i < 100_000; i++) { |
| int next = r.nextInt(); |
| |
| cache1.put(next, CACHE1 + "-" + next); |
| } |
| } |
| }); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| |
| ig.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| latch.countDown(); |
| |
| return false; |
| } |
| }, EventType.EVT_CACHE_REBALANCE_STOPPED); |
| |
| latch.await(); |
| |
| VisorNodeDataCollectorTaskArg taskArg = new VisorNodeDataCollectorTaskArg(); |
| taskArg.setCacheGroups(Collections.emptySet()); |
| |
| VisorTaskArgument<VisorNodeDataCollectorTaskArg> arg = new VisorTaskArgument<>( |
| Collections.singletonList(ignite.cluster().localNode().id()), |
| taskArg, |
| false |
| ); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| VisorNodeDataCollectorTaskResult res = ignite.compute().execute(VisorNodeDataCollectorTask.class, arg); |
| |
| CacheMetrics snapshot = ig.cache(CACHE1).metrics(); |
| |
| return snapshot.getRebalancedKeys() > snapshot.getEstimatedRebalancingKeys() |
| && Double.compare(res.getRebalance().get(ignite.cluster().localNode().id()), 1.0) == 0 |
| && snapshot.getRebalancingPartitionsCount() == 0; |
| } |
| }, 5000); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRebalanceEstimateFinishTime() throws Exception { |
| System.setProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL, String.valueOf(10_000)); |
| |
| Ignite ig1 = startGrid(1); |
| |
| final int KEYS = 300_000; |
| |
| try (IgniteDataStreamer<Integer, String> st = ig1.dataStreamer(CACHE1)) { |
| for (int i = 0; i < KEYS; i++) |
| st.addData(i, CACHE1 + "-" + i); |
| } |
| |
| final Ignite ig2 = startGrid(2); |
| |
| boolean rebalancingStartTimeGot = waitForCondition(() -> ig2.cache(CACHE1).localMetrics().getRebalancingStartTime() != -1L, 5_000); |
| |
| assertTrue("Unable to resolve rebalancing start time.", rebalancingStartTimeGot); |
| |
| CacheMetrics metrics = ig2.cache(CACHE1).localMetrics(); |
| |
| long startTime = metrics.getRebalancingStartTime(); |
| long currTime = U.currentTimeMillis(); |
| |
| assertTrue("Invalid start time [startTime=" + startTime + ", currTime=" + currTime + ']', |
| startTime > 0L && (currTime - startTime) >= 0L && (currTime - startTime) <= 5000L); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| runAsync(() -> { |
| // Waiting 75% keys will be rebalanced. |
| int partKeys = KEYS / 2; |
| |
| final long keysLine = (long)partKeys / 4L; |
| |
| log.info("Wait until keys left will be less than: " + keysLine); |
| |
| while (true) { |
| CacheMetrics m = ig2.cache(CACHE1).localMetrics(); |
| |
| long keyLeft = m.getKeysToRebalanceLeft(); |
| |
| if (keyLeft < keysLine) { |
| latch.countDown(); |
| |
| break; |
| } |
| |
| log.info("Keys left: " + m.getKeysToRebalanceLeft()); |
| |
| try { |
| Thread.sleep(1_000); |
| } |
| catch (InterruptedException e) { |
| log.warning("Interrupt thread", e); |
| |
| Thread.currentThread().interrupt(); |
| } |
| } |
| }); |
| |
| assertTrue(latch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); |
| |
| boolean estimatedRebalancingFinishTimeGot = waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return ig2.cache(CACHE1).localMetrics().getEstimatedRebalancingFinishTime() != -1L; |
| } |
| }, 5_000L); |
| |
| assertTrue("Unable to resolve estimated rebalancing finish time.", estimatedRebalancingFinishTimeGot); |
| |
| long finishTime = ig2.cache(CACHE1).localMetrics().getEstimatedRebalancingFinishTime(); |
| |
| assertTrue("Not a positive estimation of rebalancing finish time: " + finishTime, |
| finishTime > 0L); |
| |
| currTime = U.currentTimeMillis(); |
| |
| long timePassed = currTime - startTime; |
| long timeLeft = finishTime - currTime; |
| |
| // TODO: finishRebalanceLatch gets countdown much earlier because of ForceRebalanceExchangeTask triggered by cache with delay |
| // assertTrue("Got timeout while waiting for rebalancing. Estimated left time: " + timeLeft, |
| // finishRebalanceLatch.await(timeLeft + 10_000L, TimeUnit.MILLISECONDS)); |
| |
| boolean allKeysRebalanced = waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0; |
| } |
| }, timeLeft + ACCEPTABLE_TIME_INACCURACY); |
| |
| assertTrue("Some keys aren't rebalanced.", allKeysRebalanced); |
| |
| log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft + |
| ", Time to rebalance=" + (finishTime - startTime) + |
| ", startTime=" + startTime + ", finishTime=" + finishTime + ']' |
| ); |
| |
| System.clearProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL); |
| |
| currTime = U.currentTimeMillis(); |
| |
| log.info("Rebalance time: " + (currTime - startTime)); |
| |
| long diff = finishTime - currTime; |
| |
| assertTrue("Expected less than " + ACCEPTABLE_TIME_INACCURACY + ", but actual: " + diff, |
| Math.abs(diff) < ACCEPTABLE_TIME_INACCURACY); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRebalanceDelay() throws Exception { |
| rebalanceDelay = REBALANCE_DELAY; |
| |
| Ignite ig1 = startGrid(1); |
| |
| CacheConfiguration cfg3 = new CacheConfiguration() |
| .setName(CACHE3) |
| .setCacheMode(CacheMode.PARTITIONED) |
| .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) |
| .setRebalanceMode(CacheRebalanceMode.ASYNC) |
| .setRebalanceBatchSize(100) |
| .setStatisticsEnabled(true) |
| .setRebalanceDelay(REBALANCE_DELAY); |
| |
| final IgniteCache<Object, Object> cache = ig1.getOrCreateCache(cfg3); |
| |
| for (int i = 0; i < KEYS_COUNT; i++) |
| cache.put(i, CACHE3 + "-" + i); |
| |
| long beforeStartTime = U.currentTimeMillis(); |
| |
| startGrid(2); |
| startGrid(3); |
| |
| waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return cache.localMetrics().getRebalancingStartTime() != -1L; |
| } |
| }, 5_000); |
| |
| assert (cache.localMetrics().getRebalancingStartTime() < U.currentTimeMillis() + REBALANCE_DELAY); |
| assert (cache.localMetrics().getRebalancingStartTime() > beforeStartTime + REBALANCE_DELAY); |
| } |
| |
| /** |
| * Check suppliers in metrics. |
| * |
| * @param uuids Suppliers. |
| * @param gauges Metrics per supplier. |
| */ |
| private void checkSuppliers(Collection<UUID> uuids, ObjectGauge<Map<UUID, Long>>... gauges) { |
| A.notEmpty(gauges, "gauges"); |
| A.notEmpty(uuids, "uuids"); |
| |
| for (int i = 0; i < gauges.length; i++) { |
| Map<UUID, Long> val = gauges[i].value(); |
| |
| assertEquals("i=" + i, uuids.size(), val.size()); |
| |
| int fi = i; |
| uuids.forEach(uuid -> assertTrue(String.format("i=%s uuid=%s", fi, uuid), val.containsKey(uuid))); |
| } |
| } |
| } |