| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteServices; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.IgniteTransactions; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.affinity.AffinityFunction; |
| import org.apache.ignite.cache.affinity.AffinityFunctionContext; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.internal.DiscoverySpiTestListener; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.IgniteNodeAttributes; |
| import org.apache.ignite.internal.TestRecordingCommunicationSpi; |
| import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; |
| import org.apache.ignite.internal.cluster.NodeOrderComparator; |
| import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; |
| import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; |
| import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.services.Service; |
| import org.apache.ignite.services.ServiceContext; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; |
| import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.internal.TestRecordingCommunicationSpi.blockSingleExhangeMessage; |
| import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1; |
| import static org.apache.ignite.testframework.GridTestUtils.runAsync; |
| import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; |
| |
| /** |
| * |
| */ |
| public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { |
| /** */ |
| private boolean forceSrvMode; |
| |
| /** */ |
| private static final String CACHE_NAME1 = "testCache1"; |
| |
| /** */ |
| private static final String CACHE_NAME2 = "testCache2"; |
| |
| /** */ |
| private IgniteClosure<String, CacheConfiguration[]> cacheC; |
| |
| /** */ |
| private IgnitePredicate<ClusterNode> cacheNodeFilter; |
| |
| /** */ |
| private IgniteClosure<String, TestRecordingCommunicationSpi> spiC; |
| |
| /** */ |
| private IgniteClosure<String, Boolean> clientC; |
| |
| /** Expected ideal affinity assignments. */ |
| private Map<Long, Map<Integer, List<List<ClusterNode>>>> idealAff = new HashMap<>(); |
| |
| /** */ |
| private boolean skipCheckOrder; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| TestRecordingCommunicationSpi commSpi; |
| |
| if (spiC != null) |
| commSpi = spiC.apply(igniteInstanceName); |
| else |
| commSpi = new TestRecordingCommunicationSpi(); |
| |
| commSpi.setSharedMemoryPort(-1); |
| |
| cfg.setCommunicationSpi(commSpi); |
| |
| TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); |
| |
| discoSpi.setForceServerMode(forceSrvMode); |
| discoSpi.setNetworkTimeout(60_000); |
| |
| cfg.setClientFailureDetectionTimeout(100000); |
| |
| CacheConfiguration[] ccfg; |
| |
| if (cacheC != null) |
| ccfg = cacheC.apply(igniteInstanceName); |
| else |
| ccfg = new CacheConfiguration[]{cacheConfiguration()}; |
| |
| if (ccfg != null) |
| cfg.setCacheConfiguration(ccfg); |
| |
| if (clientC != null) { |
| cfg.setClientMode(clientC.apply(igniteInstanceName)); |
| |
| discoSpi.setJoinTimeout(30_000); |
| } |
| |
| DataStorageConfiguration cfg1 = new DataStorageConfiguration(); |
| |
| cfg1.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(512L * 1024 * 1024)); |
| |
| cfg.setDataStorageConfiguration(cfg1); |
| |
| return cfg; |
| } |
| |
| /** |
| * @return Cache configuration. |
| */ |
| private CacheConfiguration cacheConfiguration() { |
| CacheConfiguration ccfg = new CacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME1); |
| ccfg.setNodeFilter(cacheNodeFilter); |
| ccfg.setAffinity(affinityFunction(null)); |
| ccfg.setWriteSynchronizationMode(FULL_SYNC); |
| ccfg.setBackups(0); |
| |
| return ccfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** |
| * @param parts Number of partitions. |
| * @return Affinity function. |
| */ |
| protected AffinityFunction affinityFunction(@Nullable Integer parts) { |
| return new RendezvousAffinityFunction(false, |
| parts == null ? RendezvousAffinityFunction.DFLT_PARTITION_COUNT : parts); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| try { |
| checkCaches(); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * Checks that new joined primary is not assigned immediately. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayedAffinityCalculation() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| checkAffinity(1, topVer(1, 0), true); |
| |
| GridCacheContext cctx = ((IgniteKernal)ignite0).context().cache().internalCache(CACHE_NAME1).context(); |
| |
| AffinityFunction func = cctx.config().getAffinity(); |
| |
| AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl( |
| new ArrayList<>(ignite0.cluster().nodes()), |
| null, |
| null, |
| topVer(1, 0), |
| cctx.config().getBackups()); |
| |
| List<List<ClusterNode>> calcAff1_0 = func.assignPartitions(ctx); |
| |
| startServer(1, 2); |
| |
| ctx = new GridAffinityFunctionContextImpl( |
| new ArrayList<>(ignite0.cluster().nodes()), |
| calcAff1_0, |
| null, |
| topVer(1, 0), |
| cctx.config().getBackups()); |
| |
| List<List<ClusterNode>> calcAff2_0 = func.assignPartitions(ctx); |
| |
| checkAffinity(2, topVer(2, 0), false); |
| |
| List<List<ClusterNode>> aff2_0 = affinity(ignite0, topVer(2, 0), CACHE_NAME1); |
| |
| for (int p = 0; p < calcAff1_0.size(); p++) { |
| List<ClusterNode> a1 = calcAff1_0.get(p); |
| List<ClusterNode> a2 = calcAff2_0.get(p); |
| |
| List<ClusterNode> a = aff2_0.get(p); |
| |
| // Primary did not change. |
| assertEquals(a1.get(0), a.get(0)); |
| |
| // New primary is backup. |
| if (!a1.get(0).equals(a2.get(0))) |
| assertTrue(a.contains(a2.get(0))); |
| } |
| |
| checkAffinity(2, topVer(2, 1), true); |
| |
| List<List<ClusterNode>> aff2_1 = affinity(ignite0, topVer(2, 1), CACHE_NAME1); |
| |
| assertEquals(calcAff2_0, aff2_1); |
| } |
| |
| /** |
| * Simple test, node join. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleSequentialStart() throws Exception { |
| startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| checkAffinity(2, topVer(2, 0), false); |
| |
| checkAffinity(2, topVer(2, 1), true); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| checkAffinity(3, topVer(3, 1), true); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleSequentialStartNoCacheOnCoordinator() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| if (igniteInstanceName.equals(getTestIgniteInstanceName(0))) |
| return null; |
| |
| return new CacheConfiguration[]{cacheConfiguration()}; |
| } |
| }; |
| |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); |
| |
| testAffinitySimpleSequentialStart(); |
| |
| assertNull(((IgniteKernal)ignite(0)).context().cache().internalCache(CACHE_NAME1)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleNoCacheOnCoordinator1() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| if (igniteInstanceName.equals(getTestIgniteInstanceName(1))) |
| return null; |
| |
| return new CacheConfiguration[]{cacheConfiguration()}; |
| } |
| }; |
| |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1))); |
| |
| startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| checkAffinity(2, topVer(2, 1), true); |
| |
| startServer(2, 3); |
| |
| startServer(3, 4); |
| |
| Map<String, List<List<ClusterNode>>> aff = checkAffinity(4, topVer(4, 1), true); |
| |
| stopGrid(0); // Kill coordinator, now coordinator node1 without cache. |
| |
| boolean primaryChanged = calculateAffinity(5, false, aff); |
| |
| checkAffinity(3, topVer(5, 0), !primaryChanged); |
| |
| if (primaryChanged) |
| checkAffinity(3, topVer(5, 1), true); |
| |
| assertNull(((IgniteKernal)ignite(1)).context().cache().internalCache(CACHE_NAME1)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleNoCacheOnCoordinator2() throws Exception { |
| System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true"); |
| |
| try { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) || |
| igniteInstanceName.equals(getTestIgniteInstanceName(2))) |
| return null; |
| |
| return new CacheConfiguration[]{cacheConfiguration()}; |
| } |
| }; |
| |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); |
| |
| startServer(0, 1); |
| startServer(1, 2); |
| startServer(2, 3); |
| startServer(3, 4); |
| |
| for (int i = 0; i < 4; i++) { |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); |
| |
| // Prevent exchange finish while node0 or node1 is coordinator. |
| spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name()); |
| spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(1).name()); |
| } |
| |
| stopGrid(0); |
| stopGrid(1); |
| |
| calculateAffinity(5); |
| calculateAffinity(6); |
| |
| checkAffinity(2, topVer(6, 0), true); |
| |
| assertNull(((IgniteKernal)ignite(2)).context().cache().internalCache(CACHE_NAME1)); |
| assertNotNull(((IgniteKernal)ignite(3)).context().cache().internalCache(CACHE_NAME1)); |
| |
| assertNotNull(ignite(2).cache(CACHE_NAME1)); |
| |
| checkAffinity(2, topVer(6, 0), true); |
| |
| startServer(4, 7); |
| |
| checkAffinity(3, topVer(7, 0), false); |
| |
| checkAffinity(3, topVer(7, 1), true); |
| } |
| finally { |
| System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCreateCloseClientCacheOnCoordinator1() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| return null; |
| } |
| }; |
| |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); |
| |
| Ignite ignite0 = startServer(0, 1); |
| |
| ignite0.createCache(cacheConfiguration()); |
| |
| ignite0.cache(CACHE_NAME1); |
| |
| ignite0.cache(CACHE_NAME1).close(); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCreateCloseClientCacheOnCoordinator2() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| if (igniteInstanceName.equals(getTestIgniteInstanceName(0))) |
| return null; |
| |
| return new CacheConfiguration[]{cacheConfiguration()}; |
| } |
| }; |
| |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); |
| |
| Ignite ignite0 = startServer(0, 1); |
| |
| int topVer = 1; |
| |
| int nodes = 1; |
| |
| for (int i = 0; i < 3; i++) { |
| log.info("Iteration [iter=" + i + ", topVer=" + topVer + ']'); |
| |
| topVer++; |
| |
| startServer(nodes++, topVer); |
| |
| checkAffinity(nodes, topVer(topVer, 1), true); |
| |
| ignite0.cache(CACHE_NAME1); |
| |
| checkAffinity(nodes, topVer(topVer, 1), true); |
| |
| topVer++; |
| |
| startServer(nodes++, topVer); |
| |
| checkAffinity(nodes, topVer(topVer, 1), true); |
| |
| ignite0.cache(CACHE_NAME1).close(); |
| |
| checkAffinity(nodes, topVer(topVer, 1), true); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheDestroyAndCreate1() throws Exception { |
| cacheDestroyAndCreate(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheDestroyAndCreate2() throws Exception { |
| cacheDestroyAndCreate(false); |
| } |
| |
| /** |
| * @param cacheOnCrd If {@code false} does not create cache on coordinator. |
| * @throws Exception If failed. |
| */ |
| private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception { |
| if (!cacheOnCrd) |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0))); |
| |
| startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 1), true); |
| |
| startClient(3, 4); |
| |
| checkAffinity(4, topVer(4, 0), true); |
| |
| CacheConfiguration ccfg = cacheConfiguration(); |
| ccfg.setName(CACHE_NAME2); |
| |
| ignite(1).createCache(ccfg); |
| |
| calculateAffinity(4); |
| |
| checkAffinity(4, topVer(4, 1), true); |
| |
| ignite(1).destroyCache(CACHE_NAME2); |
| |
| idealAff.get(4L).remove(CU.cacheId(CACHE_NAME2)); |
| |
| ccfg = cacheConfiguration(); |
| ccfg.setName(CACHE_NAME2); |
| ccfg.setAffinity(affinityFunction(10)); |
| |
| ignite(1).createCache(ccfg); |
| |
| calculateAffinity(4); |
| |
| checkAffinity(4, topVer(4, 3), true); |
| |
| checkCaches(); |
| |
| ignite(1).destroyCache(CACHE_NAME2); |
| |
| idealAff.get(4L).remove(CU.cacheId(CACHE_NAME2)); |
| |
| ccfg = cacheConfiguration(); |
| ccfg.setName(CACHE_NAME2); |
| ccfg.setAffinity(affinityFunction(20)); |
| |
| ignite(1).createCache(ccfg); |
| |
| calculateAffinity(4); |
| |
| checkAffinity(4, topVer(4, 5), true); |
| } |
| |
| /** |
| * Simple test, node leaves. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleNodeLeave1() throws Exception { |
| affinitySimpleNodeLeave(2); |
| } |
| |
| /** |
| * Simple test, node leaves. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleNodeLeave2() throws Exception { |
| affinitySimpleNodeLeave(4); |
| } |
| |
| /** |
| * @param cnt Count of server nodes. |
| * @throws Exception If failed. |
| */ |
| private void affinitySimpleNodeLeave(int cnt) throws Exception { |
| int topVer = 1; |
| |
| startServer(topVer - 1, topVer++); |
| |
| for (int i = 0; i < cnt - 1; i++, topVer++) { |
| startServer(topVer - 1, topVer); |
| |
| checkAffinity(topVer, topVer(topVer, 0), false); |
| |
| checkAffinity(topVer, topVer(topVer, 1), true); |
| } |
| |
| stopNode(1, topVer); |
| |
| checkAffinity(cnt - 1, topVer(topVer, 0), true); |
| |
| checkNoExchange(cnt - 1, topVer(topVer, 1)); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * Simple test, node leaves. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleNodeLeaveClientAffinity() throws Exception { |
| startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| checkAffinity(2, topVer(2, 1), true); |
| |
| startClient(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), true); |
| |
| stopNode(1, 4); |
| |
| checkAffinity(2, topVer(4, 0), true); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNodeLeaveExchangeWaitAffinityMessage() throws Exception { |
| System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true"); |
| |
| try { |
| Ignite ignite0 = startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 1), true); |
| |
| checkOrderCounters(3, topVer(3, 1)); |
| |
| startClient(3, 4); |
| |
| checkAffinity(4, topVer(4, 0), true); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); |
| |
| lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); |
| |
| stopGrid(1); |
| |
| List<IgniteInternalFuture<?>> futs = affFutures(3, topVer(5, 0)); |
| |
| U.sleep(1000); |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| assertFalse(fut.isDone()); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| checkAffinity(3, topVer(5, 0), false); |
| |
| checkOrderCounters(3, topVer(5, 0)); |
| } |
| finally { |
| System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1); |
| } |
| } |
| |
| /** |
| * Simple test, client node joins/leaves. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleClientNodeEvents1() throws Exception { |
| affinitySimpleClientNodeEvents(1); |
| } |
| |
| /** |
| * Simple test, client node joins/leaves. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleClientNodeEvents2() throws Exception { |
| affinitySimpleClientNodeEvents(3); |
| } |
| |
| /** |
| * Simple test, client node joins/leaves. |
| * |
| * @param srvs Number of server nodes. |
| * @throws Exception If failed. |
| */ |
| private void affinitySimpleClientNodeEvents(int srvs) throws Exception { |
| long topVer = 0; |
| |
| for (int i = 0; i < srvs; i++) |
| startServer(i, ++topVer); |
| |
| if (srvs == 1) |
| checkAffinity(srvs, topVer(srvs, 0), true); |
| else |
| checkAffinity(srvs, topVer(srvs, 1), true); |
| |
| startClient(srvs, ++topVer); |
| |
| checkAffinity(srvs + 1, topVer(srvs + 1, 0), true); |
| |
| stopNode(srvs, ++topVer); |
| |
| checkAffinity(srvs, topVer(srvs + 2, 0), true); |
| } |
| |
| /** |
| * Wait for rebalance, 2 nodes join. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentMultipleJoin1() throws Exception { |
| delayAssignmentMultipleJoin(2); |
| } |
| |
| /** |
| * Wait for rebalance, 4 nodes join. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentMultipleJoin2() throws Exception { |
| delayAssignmentMultipleJoin(4); |
| } |
| |
| /** |
| * @param joinCnt Number of joining nodes. |
| * @throws Exception If failed. |
| */ |
| private void delayAssignmentMultipleJoin(int joinCnt) throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME1); |
| |
| int majorVer = 1; |
| |
| for (int i = 0; i < joinCnt; i++) { |
| majorVer++; |
| |
| startServer(i + 1, majorVer); |
| |
| checkAffinity(majorVer, topVer(majorVer, 0), false); |
| } |
| |
| List<IgniteInternalFuture<?>> futs = affFutures(majorVer, topVer(majorVer, 1)); |
| |
| U.sleep(1000); |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| assertFalse(fut.isDone()); |
| |
| spi.stopBlock(); |
| |
| checkAffinity(majorVer, topVer(majorVer, 1), true); |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| assertTrue(fut.isDone()); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * Wait for rebalance, client node joins. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentClientJoin() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME1); |
| |
| startServer(1, 2); |
| |
| startClient(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| spi.stopBlock(); |
| |
| checkAffinity(3, topVer(3, 1), true); |
| } |
| |
| /** |
| * Wait for rebalance, client node leaves. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentClientLeave() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| startClient(1, 2); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME1); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| stopNode(1, 4); |
| |
| checkAffinity(2, topVer(4, 0), false); |
| |
| spi.stopBlock(); |
| |
| checkAffinity(2, topVer(4, 1), true); |
| } |
| |
| /** |
| * Wait for rebalance, client cache is started. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentClientCacheStart() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME1); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String nodeName) { |
| return null; |
| } |
| }; |
| |
| Ignite client = startClient(3, 4); |
| |
| checkAffinity(4, topVer(4, 0), false); |
| |
| assertNotNull(client.cache(CACHE_NAME1)); |
| |
| checkAffinity(4, topVer(4, 0), false); |
| |
| spi.stopBlock(); |
| |
| checkAffinity(4, topVer(4, 1), true); |
| } |
| |
| /** |
| * Wait for rebalance, cache is started. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentCacheStart() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME1); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME2); |
| |
| ignite0.createCache(ccfg); |
| |
| calculateAffinity(3); |
| |
| checkAffinity(3, topVer(3, 1), false); |
| |
| spi.stopBlock(); |
| |
| checkAffinity(3, topVer(3, 2), true); |
| } |
| |
| /** |
| * Wait for rebalance, cache is destroyed. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentCacheDestroy() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME2); |
| |
| ignite0.createCache(ccfg); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME2); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| ignite0.destroyCache(CACHE_NAME2); |
| |
| checkAffinity(3, topVer(3, 1), false); |
| |
| checkAffinity(3, topVer(3, 2), true); |
| |
| spi.stopBlock(); |
| } |
| |
| /** |
| * Simple test, stop random node. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAffinitySimpleStopRandomNode() throws Exception { |
| //fail("IGNITE-GG-12292"); |
| |
| final int ITERATIONS = 3; |
| |
| for (int iter = 0; iter < 3; iter++) { |
| log.info("Iteration: " + iter); |
| |
| final int NODES = 5; |
| |
| for (int i = 0; i < NODES; i++) |
| startServer(i, i + 1); |
| |
| int majorVer = NODES; |
| |
| checkAffinity(majorVer, topVer(majorVer, 1), true); |
| |
| Set<Integer> stopOrder = new HashSet<>(); |
| |
| while (stopOrder.size() != NODES - 1) |
| stopOrder.add(ThreadLocalRandom.current().nextInt(NODES)); |
| |
| int nodes = NODES; |
| |
| for (Integer idx : stopOrder) { |
| log.info("Stop node: " + idx); |
| |
| majorVer++; |
| |
| stopNode(idx, majorVer); |
| |
| checkAffinity(--nodes, topVer(majorVer, 0), false); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| if (iter < ITERATIONS - 1) { |
| stopAllGrids(); |
| |
| idealAff.clear(); |
| } |
| } |
| } |
| |
| /** |
| * Wait for rebalance, coordinator leaves, 2 nodes. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentCoordinatorLeave1() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME1); |
| |
| startServer(1, 2); |
| |
| stopNode(0, 3); |
| |
| checkAffinity(1, topVer(3, 0), true); |
| |
| checkNoExchange(1, topVer(3, 1)); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * Wait for rebalance, coordinator leaves, 3 nodes. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentCoordinatorLeave2() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| Ignite ignite1 = startServer(1, 2); |
| |
| checkAffinity(2, topVer(2, 1), true); |
| |
| TestRecordingCommunicationSpi spi0 = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| TestRecordingCommunicationSpi spi1 = |
| (TestRecordingCommunicationSpi)ignite1.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi0, CACHE_NAME1); |
| blockSupplySend(spi1, CACHE_NAME1); |
| |
| startServer(2, 3); |
| |
| stopNode(0, 4); |
| |
| checkAffinity(2, topVer(4, 0), false); |
| |
| spi1.stopBlock(); |
| |
| checkAffinity(2, topVer(4, 1), true); |
| } |
| |
| /** |
| * Checks LAA absent on owner left. |
| */ |
| @Test |
| public void testSinglePartitionCacheOwnerLeft() throws Exception { |
| testSinglePartitionCacheNodeLeft(true); |
| } |
| |
| /** |
| * Checks LAA absent on non owner left. |
| */ |
| @Test |
| public void testSinglePartitionCacheNonOwnerLeft() throws Exception { |
| testSinglePartitionCacheNodeLeft(false); |
| } |
| |
| /** |
| * Since we have only 1 partition, at each node left it will be lost (no rebalance needed) or it will be still |
| * located at second node (thanks to special affinity) (no rebalance neeeded). So, LAA should never happen. |
| * |
| * @param ownerLeft Kill owner flag. |
| */ |
| private void testSinglePartitionCacheNodeLeft(boolean ownerLeft) throws Exception { |
| String cacheName = "single-partitioned"; |
| |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| CacheConfiguration ccfg = new CacheConfiguration(); |
| |
| AffinityFunction aff; |
| |
| ccfg.setName(cacheName); |
| ccfg.setWriteSynchronizationMode(FULL_SYNC); |
| ccfg.setBackups(0); |
| |
| aff = ownerLeft ? affinityFunction(1) : new MapSinglePartitionToSecondNodeAffinityFunction(); |
| |
| ccfg.setAffinity(aff); |
| |
| return new CacheConfiguration[] {ccfg}; |
| } |
| }; |
| |
| int top = 0; |
| int nodes = 0; |
| |
| startServer(nodes++, ++top); |
| |
| checkAffinity(nodes, topVer(top, 0), true); |
| |
| checkNoExchange(nodes, topVer(top, 1)); // Checks LAA is absent on initial topology. |
| |
| for (int i = 0; i < 10; i++) |
| startServer(nodes++, ++top); |
| |
| awaitPartitionMapExchange(); |
| |
| Ignite primary = primaryNode(0, cacheName); |
| |
| boolean laaOnJoin = primary.cluster().localNode().order() != 1; |
| |
| boolean leftHappen = false; |
| |
| while (nodes > 1) { |
| Map<String, List<List<ClusterNode>>> aff = |
| checkAffinity(nodes, topVer(top, leftHappen ? 0 : (laaOnJoin ? 1 : 0)), true); |
| |
| ClusterNode owner = aff.get(cacheName).get(/*part*/0).get(/*primary*/0); |
| |
| Ignite actualOwner = primaryNode(0, cacheName); |
| |
| assertEquals(actualOwner.cluster().localNode().order(), owner.order()); |
| |
| for (Ignite node : G.allGrids()) { |
| ClusterNode locNode = node.cluster().localNode(); |
| |
| boolean equals = locNode.order() == owner.order(); |
| |
| if (equals == ownerLeft) { |
| if (!ownerLeft) |
| assertNotSame(locNode.order(), 2); |
| |
| grid(locNode).close(); |
| |
| calculateAffinity(++top); |
| |
| leftHappen = true; |
| |
| break; |
| } |
| } |
| |
| checkAffinity(--nodes, topVer(top, 0), true); |
| |
| checkNoExchange(nodes, topVer(top, 1)); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class MapSinglePartitionToSecondNodeAffinityFunction extends RendezvousAffinityFunction { |
| /** |
| * Default constructor. |
| */ |
| public MapSinglePartitionToSecondNodeAffinityFunction() { |
| super(false, 1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { |
| for (ClusterNode node : affCtx.currentTopologySnapshot() ) { |
| // Always aims to map to second started node to avoid rebalance. |
| if (node.order() == 2 || affCtx.currentTopologySnapshot().size() == 1) |
| return Collections.singletonList(Collections.singletonList(node)); |
| } |
| |
| fail("Should not happen."); |
| |
| return null; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg1() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 2); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg2() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg3() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 1); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg4() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg5() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 1); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg6() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg7() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2, 4); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg8() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(6, 3, false, 2, 4); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsg9() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, false, 4); |
| } |
| |
| /** |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBlockedFinishMsgForClient() throws Exception { |
| doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, true, 4); |
| } |
| |
| /** |
| * Coordinator leaves without sending all {@link GridDhtPartitionsFullMessage} messages, |
| * exchange must be completed. |
| * |
| * @param cnt Number of nodes. |
| * @param stopId Node to stop. |
| * @param lastClient {@code True} if last started node is client. |
| * @param blockedIds Nodes not receiving exchange finish message. |
| * @throws Exception If failed. |
| */ |
| private void doTestCoordLeaveBlockedFinishExchangeMessage(int cnt, |
| int stopId, |
| boolean lastClient, |
| int... blockedIds |
| ) throws Exception { |
| int ord = 1; |
| |
| for (int i = 0; i < cnt; i++) { |
| if (i == cnt - 1 && lastClient) |
| startClient(ord - 1, ord++); |
| else |
| startServer(ord - 1, ord++); |
| } |
| |
| awaitPartitionMapExchange(); |
| |
| TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(grid(0)); |
| |
| final Set<String> blocked = new HashSet<>(); |
| |
| for (int id : blockedIds) { |
| String name = grid(id).name(); |
| |
| blocked.add(name); |
| } |
| |
| spi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { |
| @Override public boolean apply(ClusterNode node, Message msg) { |
| return blocked.contains(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME)) |
| && (msg instanceof GridDhtPartitionsFullMessage) |
| && (((GridDhtPartitionsFullMessage)msg).exchangeId() != null); |
| } |
| }); |
| |
| AffinityTopologyVersion currentTop = ignite(0).context().cache().context().exchange().readyAffinityVersion(); |
| |
| checkAffinity(cnt, currentTop, true); |
| |
| stopNode(stopId, ord); |
| |
| AffinityTopologyVersion topVer = topVer(ord, 0); |
| |
| List<IgniteInternalFuture<?>> futs = new ArrayList<>(cnt); |
| |
| List<Ignite> grids = G.allGrids(); |
| |
| for (Ignite ignite : grids) |
| futs.add(affinityReadyFuture(topVer, ignite)); |
| |
| assertEquals(futs.size(), grids.size()); |
| |
| for (int i = 0; i < futs.size(); i++) { |
| final IgniteInternalFuture<?> fut = futs.get(i); |
| |
| Ignite ignite = grids.get(i); |
| |
| if (!blocked.contains(ignite.name())) { |
| waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return fut.isDone(); |
| } |
| }, 5000); |
| |
| assertTrue(ignite.name(), fut.isDone()); |
| } |
| else |
| assertFalse(ignite.name(), fut.isDone()); |
| } |
| |
| ord++; |
| |
| stopNode(0, ord); // Triggers exchange completion from new coordinator. |
| |
| checkAffinity(cnt - 2, topVer(ord - 1, 0), true, false); |
| |
| checkAffinity(cnt - 2, topVer(ord, 0), true); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * Assignment is delayed, coordinator leaves, nodes must complete exchange with same assignments. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCoordinatorLeaveAfterNodeLeavesDelayAssignment() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| Ignite ignite2 = startServer(2, 3); |
| |
| Ignite ignite3 = startServer(3, 4); |
| |
| // Wait for topVer=(4,1) |
| awaitPartitionMapExchange(); |
| |
| TestRecordingCommunicationSpi spi0 = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(), spi2, spi3; |
| |
| // Prevent exchange completion. |
| spi0.blockMessages(GridDhtPartitionsFullMessage.class, ignite2.name()); |
| |
| // Block rebalance. |
| blockSupplySend(spi0, CACHE_NAME1); |
| blockSupplySend((spi2 = TestRecordingCommunicationSpi.spi(ignite2)), CACHE_NAME1); |
| blockSupplySend((spi3 = TestRecordingCommunicationSpi.spi(ignite3)), CACHE_NAME1); |
| |
| stopNode(1, 5); |
| |
| AffinityTopologyVersion topVer = topVer(5, 0); |
| |
| IgniteInternalFuture<?> fut0 = affinityReadyFuture(topVer, ignite0); |
| IgniteInternalFuture<?> fut2 = affinityReadyFuture(topVer, ignite2); |
| IgniteInternalFuture<?> fut3 = affinityReadyFuture(topVer, ignite3); |
| |
| U.sleep(1_000); |
| |
| assertTrue(fut0.isDone()); |
| assertFalse(fut2.isDone()); |
| assertTrue(fut3.isDone()); |
| |
| // Finish rebalance on ignite3. |
| spi2.stopBlock(true); |
| |
| stopNode(0, 6); |
| |
| spi3.stopBlock(true); |
| |
| checkAffinity(2, topVer, false); |
| } |
| |
| /** |
| * Coordinator leaves during node leave exchange. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNodeLeftExchangeCoordinatorLeave1() throws Exception { |
| nodeLeftExchangeCoordinatorLeave(3); |
| } |
| |
| /** |
| * Coordinator leaves during node leave exchange. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNodeLeftExchangeCoordinatorLeave2() throws Exception { |
| nodeLeftExchangeCoordinatorLeave(5); |
| } |
| |
| /** |
| * @param nodes Number of nodes. |
| * @throws Exception If failed. |
| */ |
| private void nodeLeftExchangeCoordinatorLeave(int nodes) throws Exception { |
| System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true"); |
| |
| try { |
| assert nodes > 2 : nodes; |
| |
| long topVer = 0; |
| |
| for (int i = 0; i < nodes; i++) |
| startServer(i, ++topVer); |
| |
| Ignite ignite1 = grid(1); |
| |
| checkAffinity(nodes, topVer(nodes, 1), true); |
| |
| TestRecordingCommunicationSpi spi1 = |
| (TestRecordingCommunicationSpi)ignite1.configuration().getCommunicationSpi(); |
| |
| // Prevent exchange finish while node0 is coordinator. |
| spi1.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name()); |
| |
| stopNode(2, ++topVer); // New exchange started. |
| |
| stopGrid(0); // Stop coordinator while exchange in progress. |
| |
| Map<String, List<List<ClusterNode>>> aff = checkAffinity(nodes - 2, topVer(topVer, 0), false); |
| |
| topVer++; |
| |
| boolean primaryChanged = calculateAffinity(nodes + 2, false, aff); |
| |
| checkAffinity(nodes - 2, topVer(topVer, 0), !primaryChanged); |
| |
| if (primaryChanged) |
| checkAffinity(nodes - 2, topVer(topVer, 1), true); |
| |
| awaitPartitionMapExchange(); |
| } |
| finally { |
| System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testJoinExchangeBecomeCoordinator() throws Exception { |
| long topVer = 0; |
| |
| final int NODES = 3; |
| |
| for (int i = 0; i < NODES; i++) |
| startServer(i, ++topVer); |
| |
| checkAffinity(NODES, topVer(topVer, 1), true); |
| |
| AtomicBoolean joined = new AtomicBoolean(); |
| |
| for (int i = 0; i < NODES; i++) { |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); |
| |
| spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { |
| @Override public boolean apply(ClusterNode node, Message msg) { |
| if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) && |
| ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null) |
| joined.set(true); // Join exchange started. |
| |
| return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) || |
| msg.getClass().equals(GridDhtPartitionsFullMessage.class); |
| } |
| }); |
| } |
| |
| IgniteInternalFuture<?> stopFut = runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| for (int j = 1; j < NODES; j++) { |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite(j).configuration().getCommunicationSpi(); |
| |
| spi.waitForBlocked(); |
| } |
| |
| for (int i = 0; i < NODES; i++) |
| stopGrid(getTestIgniteInstanceName(i), false, false); |
| |
| return null; |
| } |
| }, "stop-thread"); |
| |
| Ignite node = startGrid(NODES); |
| |
| assertEquals(NODES + 1, node.cluster().localNode().order()); |
| |
| stopFut.get(); |
| |
| for (int i = 0; i < NODES + 1; i++) |
| calculateAffinity(++topVer); |
| |
| checkAffinity(1, topVer(topVer, 0), true); |
| |
| for (int i = 0; i < NODES; i++) |
| startServer(i, ++topVer); |
| |
| checkAffinity(NODES + 1, topVer(topVer, 1), true); |
| } |
| |
| /** |
| * Wait for rebalance, send affinity change message, but affinity already changed |
| * (new nodes joined: server + client). Checks that tere is no race that could lead to |
| * unexpected partition map exchange on the client node. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentAffinityChangedUnexpectedPME() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| for (int i = 0; i < 1024; i++) |
| ignite0.cache(CACHE_NAME1).put(i, i); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); |
| |
| TestRecordingCommunicationSpi commSpi0 = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| // Starting a new client node should not lead to a rebalance obviously. |
| // So, it is expected that data distribution is ideal (ideal assignment). |
| startClient(1, 2); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| |
| // Block late affinity assignment. (*) |
| lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); |
| |
| // Starting a new server node triggers data rebalancing. |
| // [3, 0] - is not ideal (expected) |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| // Wait for sending late affinity assignment message (1) from the coordinator node. |
| // This message will be blocked (*) |
| lsnr.waitCustomEvent(); |
| |
| // Block rebalance messages. |
| blockSupplySend(commSpi0, CACHE_NAME1); |
| |
| // Starting a new server node means that the late affinity assignment message (1) should be skipped. |
| startServer(3, 4); |
| |
| TestRecordingCommunicationSpi clientSpi = new TestRecordingCommunicationSpi(); |
| clientSpi.blockMessages(blockSingleExhangeMessage()); |
| spiC = igniteInstanceName -> clientSpi; |
| |
| IgniteInternalFuture<?> startClientFut = runAsync(() -> { |
| startClient(4, 5); |
| }); |
| |
| clientSpi.waitForBlocked(); |
| |
| // Unblock the late affinity assignment message (1). |
| lsnr.stopBlockCustomEvents(); |
| |
| clientSpi.stopBlock(); |
| |
| startClientFut.get(15_000); |
| |
| // [5, 0] - is not ideal (expected) |
| checkAffinity(5, topVer(5, 0), false); |
| |
| // Rebalance is blocked at this moment, so [5, 1] is not ready. |
| checkNoExchange(5, topVer(5, 1)); |
| |
| // Unblock rebalancing. |
| // The late affinity assignments message (2) should be fired after all. |
| commSpi0.stopBlock(); |
| |
| // [5, 1] should be ideal |
| checkAffinity(5, topVer(5, 1), true); |
| |
| // The following output demonstrates the issue. |
| // The coordinator node and client initiate PME on the same toplogy version, |
| // but it relies to different custom messages. |
| // client: |
| // Started exchange init [ |
| // topVer=AffinityTopologyVersion [topVer=5, minorTopVer=1], |
| // crd=false, |
| // evt=DISCOVERY_CUSTOM_EVT, evtNode=00ac9434-fd34-4aae-95d3-ceb477700000, |
| // customEvt=CacheAffinityChangeMessage [ |
| // id=3ccc8984181-ea41279c-71cb-4b8c-8b48-1dee1baa6fe0, <<< (1) |
| // topVer=AffinityTopologyVersion [topVer=3, minorTopVer=0], ...] <<< !!! |
| // coordinator: |
| // Started exchange init |
| // [topVer=AffinityTopologyVersion [topVer=5, minorTopVer=1], |
| // crd=true, |
| // evt=DISCOVERY_CUSTOM_EVT, evtNode=00ac9434-fd34-4aae-95d3-ceb477700000, |
| // customEvt=CacheAffinityChangeMessage [ |
| // id=d2ec8984181-ea41279c-71cb-4b8c-8b48-1dee1baa6fe0, <<< (2) |
| // topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], ...] <<< !!! |
| awaitPartitionMapExchange(true, true, null, false); |
| |
| assertPartitionsSame(idleVerify(grid(0), CACHE_NAME1)); |
| } |
| |
| /** |
| * Wait for rebalance, send affinity change message, but affinity already changed (new node joined). |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentAffinityChanged() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| for (int i = 0; i < 1024; i++) |
| ignite0.cache(CACHE_NAME1).put(i, i); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); |
| |
| TestRecordingCommunicationSpi commSpi0 = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| startClient(1, 2); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| |
| lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| lsnr.waitCustomEvent(); |
| |
| blockSupplySend(commSpi0, CACHE_NAME1); |
| |
| startServer(3, 4); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| checkAffinity(4, topVer(4, 0), false); |
| |
| checkNoExchange(4, topVer(4, 1)); |
| |
| commSpi0.stopBlock(); |
| |
| checkAffinity(4, topVer(4, 1), true); |
| |
| awaitPartitionMapExchange(true, true, null, false); |
| |
| assertPartitionsSame(idleVerify(grid(0), CACHE_NAME1)); |
| } |
| |
| /** |
| * Wait for rebalance, send affinity change message, but affinity already changed (new node joined). |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentAffinityChanged2() throws Exception { |
| System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true"); |
| |
| try { |
| Ignite ignite0 = startServer(0, 1); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); |
| |
| TestRecordingCommunicationSpi commSpi0 = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| startClient(1, 2); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 1), false); |
| |
| lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); |
| |
| stopNode(2, 4); |
| |
| lsnr.waitCustomEvent(); |
| |
| blockSupplySend(commSpi0, CACHE_NAME1); |
| |
| final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| startServer(3, 5); |
| |
| return null; |
| } |
| }, 1, "server-starter"); |
| |
| Thread.sleep(2_000); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| boolean started = waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return startedFuture.isDone(); |
| } |
| }, 10_000); |
| |
| if (!started) |
| startedFuture.cancel(); |
| |
| assertTrue(started); |
| |
| checkAffinity(3, topVer(5, 0), false); |
| |
| checkNoExchange(3, topVer(5, 1)); |
| |
| commSpi0.stopBlock(); |
| |
| checkAffinity(3, topVer(5, 1), true); |
| |
| long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion(); |
| |
| assertEquals(5, nodeJoinTopVer); |
| |
| List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures(); |
| |
| for (GridDhtPartitionsExchangeFuture f : exFutures) { |
| //Shouldn't contains staled futures. |
| assertTrue(f.initialVersion().topologyVersion() >= nodeJoinTopVer); |
| } |
| } |
| finally { |
| System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1); |
| } |
| } |
| |
| /** |
| * Wait for rebalance, cache is destroyed and created again. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDelayAssignmentCacheDestroyCreate() throws Exception { |
| Ignite ignite0 = startServer(0, 1); |
| |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME2); |
| |
| ignite0.createCache(ccfg); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); |
| |
| TestRecordingCommunicationSpi spi = |
| (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); |
| |
| blockSupplySend(spi, CACHE_NAME2); |
| |
| lsnr.blockCustomEvent(CacheAffinityChangeMessage.class); |
| |
| startServer(1, 2); |
| |
| startGrid(3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| spi.stopBlock(); |
| |
| lsnr.waitCustomEvent(); |
| |
| ignite0.destroyCache(CACHE_NAME2); |
| |
| ccfg = cacheConfiguration(); |
| ccfg.setName(CACHE_NAME2); |
| ccfg.setAffinity(affinityFunction(10)); |
| |
| ignite0.createCache(ccfg); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| checkAffinity(3, topVer(3, 1), false); |
| checkAffinity(3, topVer(3, 2), false); |
| |
| idealAff.get(2L).remove(CU.cacheId(CACHE_NAME2)); |
| |
| calculateAffinity(3); |
| |
| checkAffinity(3, topVer(3, 3), true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testClientCacheStartClose() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| if (igniteInstanceName.equals(getTestIgniteInstanceName(1))) |
| return null; |
| |
| return new CacheConfiguration[]{cacheConfiguration()}; |
| } |
| }; |
| |
| startServer(0, 1); |
| |
| Ignite client = startClient(1, 2); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| |
| IgniteCache cache = client.cache(CACHE_NAME1); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| |
| cache.close(); |
| |
| checkAffinity(2, topVer(2, 0), true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheStartDestroy() throws Exception { |
| startGridsMultiThreaded(3, false); |
| |
| for (int i = 0; i < 3; i++) |
| calculateAffinity(i + 1); |
| |
| checkAffinity(3, topVer(3, 1), true); |
| |
| Ignite client = startClient(3, 4); |
| |
| checkAffinity(4, topVer(4, 0), true); |
| |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME2); |
| |
| ignite(0).createCache(ccfg); |
| |
| calculateAffinity(4); |
| |
| checkAffinity(4, topVer(4, 1), true); |
| |
| client.cache(CACHE_NAME2); |
| |
| checkAffinity(4, topVer(4, 1), true); |
| |
| client.destroyCache(CACHE_NAME2); |
| |
| checkAffinity(4, topVer(4, 2), true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testInitCacheReceivedOnJoin() throws Exception { |
| cacheC = s -> null; |
| |
| startServer(0, 1); |
| |
| startServer(1, 2); |
| |
| checkAffinity(2, topVer(2, 1), true); |
| |
| cacheC = s -> new CacheConfiguration[]{cacheConfiguration()}; |
| |
| startServer(2, 3); |
| |
| checkAffinity(3, topVer(3, 0), false); |
| |
| checkAffinity(3, topVer(3, 1), true); |
| |
| cacheC = s -> { |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME2); |
| |
| return new CacheConfiguration[]{ccfg}; |
| }; |
| |
| startClient(3, 4); |
| |
| checkAffinity(4, topVer(4, 0), true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testClientStartFirst1() throws Exception { |
| clientStartFirst(1); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testClientStartFirst2() throws Exception { |
| clientStartFirst(3); |
| } |
| |
| /** |
| * @param clients Number of client nodes. |
| * @throws Exception If failed. |
| */ |
| private void clientStartFirst(int clients) throws Exception { |
| forceSrvMode = true; |
| |
| int topVer = 0; |
| |
| for (int i = 0; i < clients; i++) |
| startClient(topVer, ++topVer); |
| |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String nodeName) { |
| return null; |
| } |
| }; |
| |
| startServer(topVer, ++topVer); |
| |
| checkAffinity(topVer, topVer(topVer, 0), true); |
| |
| startServer(topVer, ++topVer); |
| |
| checkAffinity(topVer, topVer(topVer, 0), false); |
| |
| checkAffinity(topVer, topVer(topVer, 1), true); |
| |
| stopNode(clients, ++topVer); |
| |
| checkAffinity(clients + 1, topVer(topVer, 0), true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRandomOperations() throws Exception { |
| forceSrvMode = true; |
| |
| final int MAX_SRVS = 10; |
| final int MAX_CLIENTS = 10; |
| final int MAX_CACHES = 15; |
| |
| List<String> srvs = new ArrayList<>(); |
| List<String> clients = new ArrayList<>(); |
| |
| int srvIdx = 0; |
| int clientIdx = 0; |
| int cacheIdx = 0; |
| |
| List<String> caches = new ArrayList<>(); |
| |
| long seed = System.currentTimeMillis(); |
| |
| Random rnd = new Random(seed); |
| |
| log.info("Random seed: " + seed); |
| |
| long topVer = 0; |
| |
| for (int i = 0; i < 100; i++) { |
| int op = i == 0 ? 0 : rnd.nextInt(7); |
| |
| log.info("Iteration [iter=" + i + ", op=" + op + ']'); |
| |
| switch (op) { |
| case 0: { |
| if (srvs.size() < MAX_SRVS) { |
| srvIdx++; |
| |
| String srvName = "server-" + srvIdx; |
| |
| log.info("Start server: " + srvName); |
| |
| if (rnd.nextBoolean()) { |
| cacheIdx++; |
| |
| String cacheName = "join-cache-" + cacheIdx; |
| |
| log.info("Cache for joining node: " + cacheName); |
| |
| cacheClosure(rnd, caches, cacheName, srvs, srvIdx); |
| } |
| else |
| cacheClosure(rnd, caches, DEFAULT_CACHE_NAME, srvs, srvIdx); |
| |
| startNode(srvName, ++topVer, false); |
| |
| srvs.add(srvName); |
| } |
| else |
| log.info("Skip start server."); |
| |
| break; |
| } |
| |
| case 1: { |
| if (srvs.size() > 1) { |
| String srvName = srvs.get(rnd.nextInt(srvs.size())); |
| |
| log.info("Stop server: " + srvName); |
| |
| stopNode(srvName, ++topVer); |
| |
| srvs.remove(srvName); |
| } |
| else |
| log.info("Skip stop server."); |
| |
| break; |
| } |
| |
| case 2: { |
| if (clients.size() < MAX_CLIENTS) { |
| clientIdx++; |
| |
| String clientName = "client-" + clientIdx; |
| |
| log.info("Start client: " + clientName); |
| |
| if (rnd.nextBoolean()) { |
| cacheIdx++; |
| |
| String cacheName = "join-cache-" + cacheIdx; |
| |
| log.info("Cache for joining node: " + cacheName); |
| |
| cacheClosure(rnd, caches, cacheName, srvs, srvIdx); |
| } |
| else |
| cacheClosure(rnd, caches, DEFAULT_CACHE_NAME, srvs, srvIdx); |
| |
| startNode(clientName, ++topVer, true); |
| |
| clients.add(clientName); |
| } |
| else |
| log.info("Skip start client."); |
| |
| break; |
| } |
| |
| case 3: { |
| if (clients.size() > 1) { |
| String clientName = clients.get(rnd.nextInt(clients.size())); |
| |
| log.info("Stop client: " + clientName); |
| |
| stopNode(clientName, ++topVer); |
| |
| clients.remove(clientName); |
| } |
| else |
| log.info("Skip stop client."); |
| |
| break; |
| } |
| |
| case 4: { |
| if (!caches.isEmpty()) { |
| String cacheName = caches.get(rnd.nextInt(caches.size())); |
| |
| Ignite node = randomNode(rnd, srvs, clients); |
| |
| log.info("Destroy cache [cache=" + cacheName + ", node=" + node.name() + ']'); |
| |
| node.destroyCache(cacheName); |
| |
| caches.remove(cacheName); |
| } |
| else |
| log.info("Skip destroy cache."); |
| |
| break; |
| } |
| |
| case 5: { |
| if (caches.size() < MAX_CACHES) { |
| cacheIdx++; |
| |
| String cacheName = "cache-" + cacheIdx; |
| |
| Ignite node = randomNode(rnd, srvs, clients); |
| |
| log.info("Create cache [cache=" + cacheName + ", node=" + node.name() + ']'); |
| |
| node.createCache(randomCacheConfiguration(rnd, cacheName, srvs, srvIdx)); |
| |
| calculateAffinity(topVer); |
| |
| caches.add(cacheName); |
| } |
| else |
| log.info("Skip create cache."); |
| |
| break; |
| } |
| |
| case 6: { |
| if (!caches.isEmpty()) { |
| for (int j = 0; j < 3; j++) { |
| String cacheName = caches.get(rnd.nextInt(caches.size())); |
| |
| for (int k = 0; k < 3; k++) { |
| Ignite node = randomNode(rnd, srvs, clients); |
| |
| log.info("Get/closes cache [cache=" + cacheName + ", node=" + node.name() + ']'); |
| |
| node.cache(cacheName).close(); |
| } |
| } |
| } |
| else |
| log.info("Skip get/close cache."); |
| |
| break; |
| } |
| |
| default: |
| fail(); |
| } |
| |
| IgniteKernal node = (IgniteKernal)grid(srvs.get(0)); |
| |
| checkAffinity(srvs.size() + clients.size(), |
| node.context().cache().context().exchange().readyAffinityVersion(), |
| false); |
| } |
| |
| srvIdx++; |
| |
| String srvName = "server-" + srvIdx; |
| |
| log.info("Start server: " + srvName); |
| |
| cacheClosure(rnd, caches, DEFAULT_CACHE_NAME, srvs, srvIdx); |
| |
| startNode(srvName, ++topVer, false); |
| |
| srvs.add(srvName); |
| |
| checkAffinity(srvs.size() + clients.size(), topVer(topVer, 1), true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testConcurrentStartStaticCaches() throws Exception { |
| concurrentStartStaticCaches(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testConcurrentStartStaticCachesWithClientNodes() throws Exception { |
| concurrentStartStaticCaches(true); |
| } |
| |
| /** |
| * @param withClients If {@code true} also starts client nodes. |
| * @throws Exception If failed. |
| */ |
| private void concurrentStartStaticCaches(boolean withClients) throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String igniteInstanceName) { |
| int caches = getTestIgniteInstanceIndex(igniteInstanceName) + 1; |
| |
| CacheConfiguration[] ccfgs = new CacheConfiguration[caches]; |
| |
| for (int i = 0; i < caches; i++) { |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName("cache-" + i); |
| |
| ccfgs[i] = ccfg; |
| } |
| |
| return ccfgs; |
| } |
| }; |
| |
| if (withClients) { |
| clientC = new IgniteClosure<String, Boolean>() { |
| @Override public Boolean apply(String igniteInstanceName) { |
| int idx = getTestIgniteInstanceIndex(igniteInstanceName); |
| |
| return idx % 3 == 2; |
| } |
| }; |
| } |
| |
| int ITERATIONS = 3; |
| |
| int NODES = withClients ? 8 : 5; |
| |
| for (int i = 0; i < ITERATIONS; i++) { |
| log.info("Iteration: " + i); |
| |
| TestRecordingCommunicationSpi[] testSpis = new TestRecordingCommunicationSpi[NODES]; |
| |
| for (int j = 0; j < NODES; j++) { |
| testSpis[j] = new TestRecordingCommunicationSpi(); |
| |
| testSpis[j].blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage); |
| } |
| |
| //Ensure exchanges merge. |
| spiC = igniteInstanceName -> testSpis[getTestIgniteInstanceIndex(igniteInstanceName)]; |
| |
| runAsync(() -> { |
| try { |
| for (int j = 1; j < NODES; j++) |
| testSpis[j].waitForBlocked(); |
| } |
| catch (InterruptedException e) { |
| log.error("Thread interrupted.", e); |
| } |
| |
| for (TestRecordingCommunicationSpi testSpi : testSpis) |
| testSpi.stopBlock(); |
| }); |
| |
| startGridsMultiThreaded(NODES); |
| |
| for (int t = 0; t < NODES; t++) |
| calculateAffinity(t + 1, true, null); |
| |
| if (withClients) { |
| skipCheckOrder = true; |
| |
| checkAffinity(NODES, topVer(NODES, 0), false); |
| } |
| else |
| checkAffinity(NODES, topVer(NODES, 1), true); |
| |
| if (i < ITERATIONS - 1) { |
| checkCaches(); |
| |
| awaitPartitionMapExchange(); |
| |
| stopAllGrids(); |
| |
| idealAff.clear(); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testServiceReassign() throws Exception { |
| skipCheckOrder = true; |
| |
| Ignite ignite0 = startServer(0, 1); |
| |
| IgniteServices svcs = ignite0.services(); |
| |
| for (int i = 0; i < 10; i++) |
| svcs.deployKeyAffinitySingleton("service-" + i, new TestServiceImpl(i), CACHE_NAME1, i); |
| |
| startServer(1, 2); |
| |
| startServer(2, 3); |
| |
| Map<String, List<List<ClusterNode>>> assignments = checkAffinity(3, topVer(3, 1), true); |
| |
| checkServicesDeploy(ignite(0), assignments.get(CACHE_NAME1)); |
| |
| stopGrid(0); |
| |
| boolean primaryChanged = calculateAffinity(4, false, assignments); |
| |
| assignments = checkAffinity(2, topVer(4, 0), !primaryChanged); |
| |
| if (primaryChanged) |
| checkAffinity(2, topVer(4, 1), true); |
| |
| checkServicesDeploy(ignite(1), assignments.get(CACHE_NAME1)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNoForceKeysRequests() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String s) { |
| return null; |
| } |
| }; |
| |
| final AtomicBoolean fail = new AtomicBoolean(); |
| |
| spiC = new IgniteClosure<String, TestRecordingCommunicationSpi>() { |
| @Override public TestRecordingCommunicationSpi apply(String s) { |
| TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi(); |
| |
| spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { |
| @Override public boolean apply(ClusterNode node, Message msg) { |
| if (msg instanceof GridDhtForceKeysRequest || msg instanceof GridDhtForceKeysResponse) { |
| fail.set(true); |
| |
| U.dumpStack(log, "Unexpected message: " + msg); |
| } |
| |
| return false; |
| } |
| }); |
| |
| return spi; |
| } |
| }; |
| |
| final int SRVS = 3; |
| |
| for (int i = 0; i < SRVS; i++) |
| startGrid(i); |
| |
| startClientGrid(SRVS); |
| |
| final List<CacheConfiguration> ccfgs = new ArrayList<>(); |
| |
| ccfgs.add(cacheConfiguration("tc1", TRANSACTIONAL, 0)); |
| ccfgs.add(cacheConfiguration("tc2", TRANSACTIONAL, 1)); |
| ccfgs.add(cacheConfiguration("tc3", TRANSACTIONAL, 2)); |
| |
| for (CacheConfiguration ccfg : ccfgs) |
| ignite(0).createCache(ccfg); |
| |
| final int NODES = SRVS + 1; |
| |
| final AtomicInteger nodeIdx = new AtomicInteger(); |
| |
| final long stopTime = System.currentTimeMillis() + 60_000; |
| |
| IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| int idx = nodeIdx.getAndIncrement(); |
| |
| Ignite node = grid(idx); |
| |
| List<IgniteCache<Object, Object>> caches = new ArrayList<>(); |
| |
| for (CacheConfiguration ccfg : ccfgs) |
| caches.add(node.cache(ccfg.getName())); |
| |
| while (!fail.get() && System.currentTimeMillis() < stopTime) { |
| for (IgniteCache<Object, Object> cache : caches) |
| cacheOperations(cache); |
| } |
| |
| return null; |
| } |
| }, NODES, "update-thread"); |
| |
| IgniteInternalFuture<?> srvRestartFut = runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!fail.get() && System.currentTimeMillis() < stopTime) { |
| Ignite node = startGrid(NODES); |
| |
| List<IgniteCache<Object, Object>> caches = new ArrayList<>(); |
| |
| for (CacheConfiguration ccfg : ccfgs) |
| caches.add(node.cache(ccfg.getName())); |
| |
| for (int i = 0; i < 2; i++) { |
| for (IgniteCache<Object, Object> cache : caches) |
| cacheOperations(cache); |
| } |
| |
| U.sleep(500); |
| |
| stopGrid(NODES); |
| |
| U.sleep(500); |
| } |
| |
| return null; |
| } |
| }, "srv-restart"); |
| |
| srvRestartFut.get(); |
| updateFut.get(); |
| |
| assertFalse("Unexpected messages.", fail.get()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testStreamer1() throws Exception { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String s) { |
| return null; |
| } |
| }; |
| |
| startServer(0, 1); |
| |
| cacheC = null; |
| cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0))); |
| |
| startServer(1, 2); |
| |
| IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(CACHE_NAME1); |
| |
| streamer.addData(1, 1); |
| streamer.flush(); |
| } |
| |
| /** |
| * @param cache Cache |
| */ |
| private void cacheOperations(IgniteCache<Object, Object> cache) { |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| final int KEYS = 10_000; |
| |
| try { |
| cache.get(rnd.nextInt(KEYS)); |
| |
| cache.put(rnd.nextInt(KEYS), rnd.nextInt(10)); |
| |
| cache.getAndPut(rnd.nextInt(KEYS), rnd.nextInt(10)); |
| |
| cache.remove(rnd.nextInt(KEYS)); |
| |
| cache.getAndRemove(rnd.nextInt(KEYS)); |
| |
| cache.remove(rnd.nextInt(KEYS), rnd.nextInt(10)); |
| |
| cache.putIfAbsent(rnd.nextInt(KEYS), rnd.nextInt(10)); |
| |
| cache.replace(rnd.nextInt(KEYS), rnd.nextInt(10)); |
| |
| cache.replace(rnd.nextInt(KEYS), rnd.nextInt(10), rnd.nextInt(10)); |
| |
| cache.invoke(rnd.nextInt(KEYS), new TestEntryProcessor(rnd.nextInt(10))); |
| |
| if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) { |
| IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); |
| |
| for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { |
| for (TransactionIsolation isolation : TransactionIsolation.values()) { |
| try (Transaction tx = txs.txStart(concurrency, isolation)) { |
| Integer key = rnd.nextInt(KEYS); |
| |
| cache.getAndPut(key, rnd.nextInt(10)); |
| |
| cache.invoke(key + 1, new TestEntryProcessor(rnd.nextInt(10))); |
| |
| cache.get(key + 2); |
| |
| tx.commit(); |
| } |
| } |
| } |
| } |
| } |
| catch (Exception e) { |
| log.info("Cache operation failed: " + e); |
| } |
| } |
| |
| /** |
| * @param name Cache name. |
| * @param atomicityMode Cache atomicity mode. |
| * @param backups Number of backups. |
| * @return Cache configuration. |
| */ |
| private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode, int backups) { |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setName(name); |
| ccfg.setAtomicityMode(atomicityMode); |
| ccfg.setBackups(backups); |
| |
| return ccfg; |
| } |
| |
| /** |
| * @param ignite Node. |
| * @param affinity Affinity. |
| * @throws Exception If failed. |
| */ |
| private void checkServicesDeploy(Ignite ignite, final List<List<ClusterNode>> affinity) throws Exception { |
| Affinity<Object> aff = ignite.affinity(CACHE_NAME1); |
| |
| for (int i = 0; i < 10; i++) { |
| final int part = aff.partition(i); |
| |
| final String srvcName = "service-" + i; |
| |
| final ClusterNode srvcNode = affinity.get(part).get(0); |
| |
| boolean wait = waitForCondition(new PA() { |
| @Override public boolean apply() { |
| TestService srvc = grid(srvcNode).services().service(srvcName); |
| |
| if (srvc == null) |
| return false; |
| |
| assertEquals(srvcNode, srvc.serviceNode()); |
| |
| return true; |
| } |
| }, 5000); |
| |
| assertTrue(wait); |
| } |
| } |
| |
| /** |
| * @param rnd Random generator. |
| * @param srvs Server. |
| * @param clients Clients. |
| * @return Random node. |
| */ |
| private Ignite randomNode(Random rnd, List<String> srvs, List<String> clients) { |
| String name = null; |
| |
| if (rnd.nextBoolean()) { |
| if (!clients.isEmpty()) |
| name = clients.get(rnd.nextInt(clients.size())); |
| } |
| |
| if (name == null) |
| name = srvs.get(rnd.nextInt(srvs.size())); |
| |
| Ignite node = grid(name); |
| |
| assert node != null; |
| |
| return node; |
| } |
| |
| /** |
| * @param rnd Random generator. |
| * @param caches Caches list. |
| * @param cacheName Cache name. |
| * @param srvs Server nodes. |
| * @param srvIdx Current servers index. |
| */ |
| private void cacheClosure(Random rnd, List<String> caches, String cacheName, List<String> srvs, int srvIdx) { |
| if (!DEFAULT_CACHE_NAME.equals(cacheName)) { |
| final CacheConfiguration ccfg = randomCacheConfiguration(rnd, cacheName, srvs, srvIdx); |
| |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String s) { |
| return new CacheConfiguration[]{ccfg}; |
| } |
| }; |
| |
| caches.add(cacheName); |
| } |
| else { |
| cacheC = new IgniteClosure<String, CacheConfiguration[]>() { |
| @Override public CacheConfiguration[] apply(String s) { |
| return null; |
| } |
| }; |
| } |
| } |
| |
| /** |
| * @param rnd Random generator. |
| * @param name Cache name. |
| * @param srvs Server nodes. |
| * @param srvIdx Current servers index. |
| * @return Cache configuration. |
| */ |
| private CacheConfiguration randomCacheConfiguration(Random rnd, String name, List<String> srvs, int srvIdx) { |
| CacheConfiguration ccfg = cacheConfiguration(); |
| |
| ccfg.setAtomicityMode(rnd.nextBoolean() ? TRANSACTIONAL : ATOMIC); |
| ccfg.setBackups(rnd.nextInt(10)); |
| ccfg.setRebalanceMode(rnd.nextBoolean() ? SYNC : ASYNC); |
| ccfg.setAffinity(affinityFunction(rnd.nextInt(2048) + 10)); |
| |
| if (rnd.nextBoolean()) { |
| Set<String> exclude = new HashSet<>(); |
| |
| for (int i = 0; i < 10; i++) { |
| if (i % 2 == 0 && !srvs.isEmpty()) |
| exclude.add(srvs.get(rnd.nextInt(srvs.size()))); |
| else |
| exclude.add("server-" + (srvIdx + rnd.nextInt(10))); |
| } |
| |
| ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude)); |
| } |
| |
| ccfg.setName(name); |
| |
| return ccfg; |
| } |
| |
| /** |
| * @param node Node. |
| * @param topVer Topology version. |
| * @param cache Cache name. |
| * @return Affinity assignments. |
| */ |
| private List<List<ClusterNode>> affinity(Ignite node, AffinityTopologyVersion topVer, String cache) { |
| GridCacheContext cctx = ((IgniteKernal)node).context().cache().internalCache(cache).context(); |
| |
| return cctx.affinity().assignments(topVer); |
| } |
| |
| /** |
| * @param spi SPI. |
| * @param cacheName Cache name. |
| */ |
| private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) { |
| final int grpId = groupIdForCache(spi.ignite(), cacheName); |
| |
| spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { |
| @Override public boolean apply(ClusterNode node, Message msg) { |
| if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class)) |
| return false; |
| |
| return ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId; |
| } |
| }); |
| } |
| |
| /** |
| * @param expNodes Expected nodes number. |
| * @param topVer Topology version. |
| * @return Affinity futures. |
| */ |
| private List<IgniteInternalFuture<?>> affFutures(int expNodes, AffinityTopologyVersion topVer) { |
| List<Ignite> nodes = G.allGrids(); |
| |
| assertEquals(expNodes, nodes.size()); |
| |
| List<IgniteInternalFuture<?>> futs = new ArrayList<>(nodes.size()); |
| |
| for (Ignite node : nodes) { |
| IgniteInternalFuture<?> |
| fut = ((IgniteKernal)node).context().cache().context().exchange().affinityReadyFuture(topVer); |
| |
| futs.add(fut); |
| } |
| |
| return futs; |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param node Node. |
| * @return Exchange future. |
| */ |
| private IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion topVer, Ignite node) { |
| IgniteInternalFuture<?> fut = ((IgniteKernal)node).context().cache().context().exchange(). |
| affinityReadyFuture(topVer); |
| |
| return fut != null ? fut : new GridFinishedFuture<>(); |
| } |
| |
| /** |
| * @param major Major version. |
| * @param minor Minor version. |
| * @return Topology version. |
| */ |
| private static AffinityTopologyVersion topVer(long major, int minor) { |
| return new AffinityTopologyVersion(major, minor); |
| } |
| |
| /** |
| * |
| */ |
| private void checkCaches() { |
| List<Ignite> nodes = G.allGrids(); |
| |
| assertFalse(nodes.isEmpty()); |
| |
| for (Ignite node : nodes) { |
| Collection<String> cacheNames = node.cacheNames(); |
| |
| assertFalse(cacheNames.isEmpty()); |
| |
| for (String cacheName : cacheNames) { |
| try { |
| IgniteCache<Object, Object> cache = node.cache(cacheName); |
| |
| assertNotNull(cache); |
| |
| Long val = System.currentTimeMillis(); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| for (int i = 0; i < 100; i++) { |
| int key = rnd.nextInt(100_000); |
| |
| cache.put(key, val); |
| |
| assertEquals(val, cache.get(key)); |
| |
| cache.remove(key); |
| |
| assertNull(cache.get(key)); |
| } |
| } |
| catch (Exception e) { |
| assertTrue("Unexpected error: " + e, X.hasCause(e, ClusterTopologyServerNotFoundException.class)); |
| |
| Affinity<Object> aff = node.affinity(cacheName); |
| |
| assert aff.partitions() > 0; |
| |
| for (int p = 0; p > aff.partitions(); p++) { |
| Collection<ClusterNode> partNodes = aff.mapPartitionToPrimaryAndBackups(p); |
| |
| assertTrue(partNodes.isEmpty()); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param expNode Expected nodes number. |
| * @param topVer Topology version. |
| * @throws Exception If failed. |
| */ |
| private void checkNoExchange(int expNode, AffinityTopologyVersion topVer) throws Exception { |
| List<IgniteInternalFuture<?>> futs = affFutures(expNode, topVer); |
| |
| U.sleep(1000); |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| assertFalse(fut.isDone()); |
| } |
| |
| /** |
| * @param expNodes Expected nodes number. |
| * @param topVer Topology version. |
| * @throws Exception If failed. |
| */ |
| private void checkOrderCounters(int expNodes, AffinityTopologyVersion topVer) throws Exception { |
| List<Ignite> nodes = G.allGrids(); |
| |
| Long order = null; |
| |
| for (Ignite node : nodes) { |
| IgniteKernal node0 = (IgniteKernal)node; |
| |
| if (node0.configuration().isClientMode()) |
| continue; |
| |
| IgniteInternalFuture<?> fut = node0.context().cache().context().exchange().affinityReadyFuture(topVer); |
| |
| if (fut != null) |
| fut.get(); |
| |
| AtomicLong orderCntr = GridTestUtils.getFieldValue(node0.context().cache().context().versions(), "order"); |
| |
| log.info("Order [node=" + node0.name() + ", order=" + orderCntr.get() + ']'); |
| |
| if (order == null) |
| order = orderCntr.get(); |
| else |
| assertEquals(order, (Long)orderCntr.get()); |
| } |
| |
| assertEquals(expNodes, nodes.size()); |
| } |
| |
| /** |
| * @param expNodes Expected nodes number. |
| * @param topVer Topology version. |
| * @param expIdeal If {@code true} expect ideal affinity assignment. |
| * @throws Exception If failed. |
| * @return Affinity assignments. |
| */ |
| private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes, |
| AffinityTopologyVersion topVer, |
| boolean expIdeal) throws Exception { |
| return checkAffinity(expNodes, topVer, expIdeal, true); |
| } |
| |
| /** |
| * @param expNodes Expected nodes number. |
| * @param topVer Topology version. |
| * @param expIdeal If {@code true} expect ideal affinity assignment. |
| * @param checkPublicApi {@code True} to check {@link Affinity} API. |
| * @throws Exception If failed. |
| * @return Affinity assignments. |
| */ |
| private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes, |
| AffinityTopologyVersion topVer, |
| boolean expIdeal, |
| boolean checkPublicApi |
| ) throws Exception { |
| boolean compatibility = IgniteSystemProperties.getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1); |
| |
| List<Ignite> nodes = G.allGrids(); |
| |
| Map<String, List<List<ClusterNode>>> aff = new HashMap<>(); |
| |
| GridDhtPartitionsExchangeFuture exchFut = null; |
| |
| for (Ignite node : nodes) { |
| log.info("Check affinity [node=" + node.name() + ", topVer=" + topVer + ", expIdeal=" + expIdeal + ']'); |
| |
| IgniteKernal node0 = (IgniteKernal)node; |
| |
| IgniteInternalFuture<?> fut = node0.context().cache().context().exchange().affinityReadyFuture(topVer); |
| |
| if (fut != null) |
| fut.get(); |
| |
| if (!compatibility) { |
| List<GridDhtPartitionsExchangeFuture> exchFuts = |
| ((IgniteEx)node).context().cache().context().exchange().exchangeFutures(); |
| |
| for (GridDhtPartitionsExchangeFuture f : exchFuts) { |
| if (f.exchangeDone() && !f.isMerged() && f.topologyVersion().equals(topVer)) { |
| if (exchFut != null) // Compare with previous node. |
| assertEquals(f.rebalanced(), exchFut.rebalanced()); // Check homogeneity. |
| |
| assertNotSame(exchFut, f); |
| |
| exchFut = f; |
| |
| break; |
| } |
| } |
| |
| assertNotNull(exchFut); |
| } |
| |
| for (GridCacheContext cctx : node0.context().cache().context().cacheContexts()) { |
| if (cctx.startTopologyVersion().compareTo(topVer) > 0) |
| continue; |
| |
| List<List<ClusterNode>> aff1 = aff.get(cctx.name()); |
| List<List<ClusterNode>> aff2 = cctx.affinity().assignments(topVer); |
| |
| if (aff1 == null) |
| aff.put(cctx.name(), aff2); |
| else |
| assertAffinity(aff1, aff2, node, cctx.name(), topVer); |
| |
| if (expIdeal) { |
| if (!compatibility) |
| assertEquals( |
| "Rebalance state not as expected [node=" + node.name() + ", top=" + topVer + "]", |
| true, |
| exchFut.rebalanced()); |
| |
| List<List<ClusterNode>> ideal = idealAssignment(topVer, cctx.cacheId()); |
| |
| assertAffinity(ideal, aff2, node, cctx.name(), topVer); |
| |
| if (checkPublicApi) { |
| Affinity<Object> cacheAff = node.affinity(cctx.name()); |
| |
| for (int i = 0; i < 10; i++) { |
| int part = cacheAff.partition(i); |
| |
| List<ClusterNode> partNodes = ideal.get(part); |
| |
| if (partNodes.isEmpty()) { |
| try { |
| cacheAff.mapKeyToNode(i); |
| |
| fail(); |
| } |
| catch (IgniteException ignore) { |
| // No-op. |
| } |
| } |
| else { |
| ClusterNode primary = cacheAff.mapKeyToNode(i); |
| |
| assertEquals(primary, partNodes.get(0)); |
| } |
| } |
| |
| for (int p = 0; p < ideal.size(); p++) { |
| List<ClusterNode> exp = ideal.get(p); |
| Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p); |
| |
| assertEqualsCollections(exp, partNodes); |
| } |
| } |
| } |
| } |
| } |
| |
| assertEquals(expNodes, nodes.size()); |
| |
| if (!skipCheckOrder) |
| checkOrderCounters(expNodes, topVer); |
| |
| return aff; |
| } |
| |
| /** |
| * @param aff1 Affinity 1. |
| * @param aff2 Affinity 2. |
| * @param node Node. |
| * @param cacheName Cache name. |
| * @param topVer Topology version. |
| */ |
| private void assertAffinity(List<List<ClusterNode>> aff1, |
| List<List<ClusterNode>> aff2, |
| Ignite node, |
| String cacheName, |
| AffinityTopologyVersion topVer) { |
| assertEquals(aff1.size(), aff2.size()); |
| |
| if (!aff1.equals(aff2)) { |
| for (int i = 0; i < aff1.size(); i++) { |
| Collection<UUID> n1 = new ArrayList<>(F.nodeIds(aff1.get(i))); |
| Collection<UUID> n2 = new ArrayList<>(F.nodeIds(aff2.get(i))); |
| |
| assertEquals("Wrong affinity [node=" + node.name() + |
| ", topVer=" + topVer + |
| ", cache=" + cacheName + |
| ", part=" + i + ']', |
| n1, n2); |
| } |
| |
| fail(); |
| } |
| } |
| |
| /** |
| * @param idx Node index. |
| * @param topVer New topology version. |
| * @return Started node. |
| * @throws Exception If failed. |
| */ |
| private Ignite startClient(int idx, long topVer) throws Exception { |
| Ignite ignite = startClientGrid(idx); |
| |
| assertTrue(ignite.configuration().isClientMode()); |
| |
| calculateAffinity(topVer); |
| |
| return ignite; |
| } |
| |
| /** |
| * @param idx Node index. |
| * @param topVer New topology version. |
| * @throws Exception If failed. |
| * @return Started node. |
| */ |
| private Ignite startServer(int idx, long topVer) throws Exception { |
| Ignite node = startGrid(idx); |
| |
| assertFalse(node.configuration().isClientMode()); |
| |
| calculateAffinity(topVer); |
| |
| return node; |
| } |
| |
| /** |
| * @param name Node name. |
| * @param topVer Topology version. |
| * @param client Client flag. |
| * @throws Exception If failed. |
| */ |
| private void startNode(String name, long topVer, boolean client) throws Exception { |
| if (client) |
| startClientGrid(name); |
| else |
| startGrid(name); |
| |
| calculateAffinity(topVer); |
| } |
| |
| /** |
| * @param name Node name. |
| * @param topVer Topology version. |
| * @throws Exception If failed. |
| */ |
| private void stopNode(String name, long topVer) throws Exception { |
| stopGrid(name); |
| |
| calculateAffinity(topVer); |
| } |
| |
| /** |
| * @param idx Node index. |
| * @param topVer New topology version. |
| * @throws Exception If failed. |
| */ |
| private void stopNode(int idx, long topVer) throws Exception { |
| stopNode(getTestIgniteInstanceName(idx), topVer); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param cacheId Cache ID. |
| * @return Ideal assignment. |
| */ |
| private List<List<ClusterNode>> idealAssignment(AffinityTopologyVersion topVer, Integer cacheId) { |
| Map<Integer, List<List<ClusterNode>>> assignments = idealAff.get(topVer.topologyVersion()); |
| |
| assert assignments != null : "No assignments [topVer=" + topVer + ", cache=" + cacheId + ']'; |
| |
| List<List<ClusterNode>> cacheAssignments = assignments.get(cacheId); |
| |
| assert cacheAssignments != null : "No cache assignments [topVer=" + topVer + ", cache=" + cacheId + ']'; |
| |
| return cacheAssignments; |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @throws Exception If failed. |
| */ |
| private void calculateAffinity(long topVer) throws Exception { |
| calculateAffinity(topVer, false, null); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param filterByRcvd If {@code true} filters caches by 'receivedFrom' property. |
| * @param cur Optional current affinity. |
| * @throws Exception If failed. |
| * @return {@code True} if some primary node changed comparing to given affinity. |
| */ |
| private boolean calculateAffinity(long topVer, |
| boolean filterByRcvd, |
| @Nullable Map<String, List<List<ClusterNode>>> cur) throws Exception { |
| List<Ignite> all = G.allGrids(); |
| |
| IgniteKernal ignite = (IgniteKernal)Collections.min(all, new Comparator<Ignite>() { |
| @Override public int compare(Ignite n1, Ignite n2) { |
| return Long.compare(n1.cluster().localNode().order(), n2.cluster().localNode().order()); |
| } |
| }); |
| |
| assert !all.isEmpty(); |
| |
| Map<Integer, List<List<ClusterNode>>> assignments = idealAff.get(topVer); |
| |
| if (assignments == null) |
| idealAff.put(topVer, assignments = new HashMap<>()); |
| |
| GridKernalContext ctx = ignite.context(); |
| |
| GridCacheSharedContext cctx = ctx.cache().context(); |
| |
| AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer); |
| |
| cctx.discovery().topologyFuture(topVer).get(); |
| |
| List<GridDhtPartitionsExchangeFuture> futs = cctx.exchange().exchangeFutures(); |
| |
| DiscoveryEvent evt = null; |
| |
| long stopTime = System.currentTimeMillis() + 10_000; |
| |
| boolean primaryChanged = false; |
| |
| do { |
| for (int i = futs.size() - 1; i >= 0; i--) { |
| GridDhtPartitionsExchangeFuture fut = futs.get(i); |
| |
| if (fut.initialVersion().equals(topVer0)) { |
| evt = fut.firstEvent(); |
| |
| break; |
| } |
| } |
| |
| if (evt == null) { |
| U.sleep(500); |
| |
| futs = cctx.exchange().exchangeFutures(); |
| } |
| else |
| break; |
| } while (System.currentTimeMillis() < stopTime); |
| |
| assertNotNull("Failed to find exchange future:", evt); |
| |
| Collection<ClusterNode> allNodes = ctx.discovery().serverNodes(topVer0); |
| |
| for (DynamicCacheDescriptor cacheDesc : ctx.cache().cacheDescriptors().values()) { |
| if (assignments.get(cacheDesc.cacheId()) != null) |
| continue; |
| |
| if (filterByRcvd && cacheDesc.receivedFrom() != null && |
| ctx.discovery().node(topVer0, cacheDesc.receivedFrom()) == null) |
| continue; |
| |
| AffinityFunction func = cacheDesc.cacheConfiguration().getAffinity(); |
| |
| func = cctx.cache().clone(func); |
| |
| cctx.kernalContext().resource().injectGeneric(func); |
| |
| List<ClusterNode> affNodes = new ArrayList<>(); |
| |
| IgnitePredicate<ClusterNode> filter = cacheDesc.cacheConfiguration().getNodeFilter(); |
| |
| for (ClusterNode n : allNodes) { |
| if (!n.isClient() && (filter == null || filter.apply(n))) |
| affNodes.add(n); |
| } |
| |
| Collections.sort(affNodes, NodeOrderComparator.getInstance()); |
| |
| AffinityFunctionContext affCtx = new GridAffinityFunctionContextImpl( |
| affNodes, |
| previousAssignment(topVer, cacheDesc.cacheId()), |
| evt, |
| topVer0, |
| cacheDesc.cacheConfiguration().getBackups()); |
| |
| List<List<ClusterNode>> assignment = func.assignPartitions(affCtx); |
| |
| if (cur != null) { |
| List<List<ClusterNode>> prev = cur.get(cacheDesc.cacheConfiguration().getName()); |
| |
| assertEquals(prev.size(), assignment.size()); |
| |
| if (!primaryChanged) { |
| for (int p = 0; p < prev.size(); p++) { |
| List<ClusterNode> nodes0 = prev.get(p); |
| List<ClusterNode> nodes1 = assignment.get(p); |
| |
| if (!nodes0.isEmpty() && !nodes1.isEmpty()) { |
| ClusterNode p0 = nodes0.get(0); |
| ClusterNode p1 = nodes1.get(0); |
| |
| if (allNodes.contains(p0) && !p0.equals(p1)) { |
| primaryChanged = true; |
| |
| log.info("Primary changed [cache=" + cacheDesc.cacheConfiguration().getName() + |
| ", part=" + p + |
| ", prev=" + F.nodeIds(nodes0) + |
| ", new=" + F.nodeIds(nodes1) + ']'); |
| |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| assignments.put(cacheDesc.cacheId(), assignment); |
| } |
| |
| return primaryChanged; |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param cacheId Cache ID. |
| * @return Previous assignment. |
| */ |
| @Nullable private List<List<ClusterNode>> previousAssignment(long topVer, Integer cacheId) { |
| if (topVer == 1) |
| return null; |
| |
| Map<Integer, List<List<ClusterNode>>> assignments = idealAff.get(topVer - 1); |
| |
| assertNotNull(assignments); |
| |
| return assignments.get(cacheId); |
| } |
| |
| /** |
| * |
| */ |
| interface TestService { |
| /** |
| * @return Node. |
| */ |
| ClusterNode serviceNode(); |
| } |
| |
| /** |
| * |
| */ |
| private static class TestServiceImpl implements Service, TestService { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** */ |
| private int key; |
| |
| /** |
| * @param key Key. |
| */ |
| public TestServiceImpl(int key) { |
| this.key = key; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cancel(ServiceContext ctx) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void init(ServiceContext ctx) throws Exception { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void execute(ServiceContext ctx) throws Exception { |
| ignite.log().info("Execute service [key=" + key + ", node=" + ignite.name() + ']'); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClusterNode serviceNode() { |
| return ignite.cluster().localNode(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> { |
| /** */ |
| private Object val; |
| |
| /** |
| * @param val Value. |
| */ |
| public TestEntryProcessor(Object val) { |
| this.val = val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| e.setValue(val); |
| |
| return null; |
| } |
| } |
| } |