| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cache; |
| |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.EntryProcessorException; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cluster.ClusterState; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.TestRecordingCommunicationSpi; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi; |
| |
| /** |
| * The tets demonstrate the synchronization of shifting between topology versions on clusters during rebalance. |
| * Two nodes join toopology simultaneously, and the rebalancing topology on the entire cluster will be switched |
| * only when both rebalancings for the nodes finish. |
| */ |
| public class ReplicationCacheConsistencyOnUnstableTopologyTest extends GridCommonAbstractTest { |
| /** |
| * Cache mode. |
| */ |
| private CacheMode cacheMode; |
| |
| /** |
| * Cache write synchronization mode. |
| */ |
| private CacheWriteSynchronizationMode writeSynchronizationMode; |
| |
| /** |
| * True if the cache read operation can execute on backup replicas. |
| */ |
| private boolean readFromBackup; |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| return super.getConfiguration(igniteInstanceName) |
| .setConsistentId(igniteInstanceName) |
| .setCommunicationSpi(new TestRecordingCommunicationSpi()) |
| .setDataStorageConfiguration(new DataStorageConfiguration() |
| .setDefaultDataRegionConfiguration(new DataRegionConfiguration() |
| .setMaxSize(100L * 1024 * 1024) |
| .setPersistenceEnabled(true))) |
| .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME) |
| .setAffinity(new RendezvousAffinityFunction(false, 3)) |
| .setCacheMode(cacheMode) |
| .setBackups(2) |
| .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) |
| .setWriteSynchronizationMode(writeSynchronizationMode) |
| .setReadFromBackup(readFromBackup)); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| |
| super.afterTest(); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testReplicatedFullSync() throws Exception { |
| process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_SYNC, false); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testReplicatedFullSyncReadFromBackup() throws Exception { |
| process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_SYNC, true); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testReplicatedPrimarySync() throws Exception { |
| process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.PRIMARY_SYNC, false); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testReplicatedPrimarySyncReadFromBackup() throws Exception { |
| process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.PRIMARY_SYNC, true); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testReplicatedFullAsync() throws Exception { |
| process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_ASYNC, false); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testReplicatedFullAsyncReadFromBackup() throws Exception { |
| process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_ASYNC, true); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testPartitionedFullSync() throws Exception { |
| process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_SYNC, false); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testPartitionedFullSyncReadFromBackup() throws Exception { |
| process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_SYNC, true); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testPartitionedPrimarySync() throws Exception { |
| process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.PRIMARY_SYNC, false); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testPartitionedPrimarySyncReadFromBackup() throws Exception { |
| process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.PRIMARY_SYNC, true); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testPartitionedFullAsync() throws Exception { |
| process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_ASYNC, false); |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testPartitionedFullAsyncReadFromBackup() throws Exception { |
| process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_ASYNC, true); |
| } |
| |
| /** |
| * Executes a test scenario. |
| * |
| * @param cacheMode Cache mode. |
| * @param writeSynchronizationMode Cache write synchronization mode. |
| * @param readFromBackup True if the cache read operation can execute on backup replicas. |
| * @throws Exception If fail. |
| */ |
| private void process( |
| CacheMode cacheMode, |
| CacheWriteSynchronizationMode writeSynchronizationMode, |
| boolean readFromBackup |
| ) throws Exception { |
| this.cacheMode = cacheMode; |
| this.writeSynchronizationMode = writeSynchronizationMode; |
| this.readFromBackup = readFromBackup; |
| |
| IgniteEx ignite = startGrids(3); |
| |
| ignite.cluster().state(ClusterState.ACTIVE); |
| |
| awaitPartitionMapExchange(); |
| |
| assertEquals(0, ignite.cache(DEFAULT_CACHE_NAME).size()); |
| |
| IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME); |
| |
| streamer.allowOverwrite(false); |
| |
| for (int i = 0; i < 20; i++) { |
| streamer.addData(i, i); |
| } |
| |
| streamer.flush(); |
| |
| ignite(1).close(); |
| |
| for (int i = 20; i < 40; i++) { |
| streamer.addData(i, i); |
| } |
| |
| streamer.flush(); |
| |
| ignite(2).close(); |
| |
| for (int i = 40; i < 60; i++) { |
| streamer.addData(i, i); |
| } |
| |
| streamer.close(); |
| |
| spi(ignite).blockMessages((node, message) -> { |
| if (message instanceof GridDhtPartitionSupplyMessage && testNodeName(2).equals(node.consistentId())) { |
| GridDhtPartitionSupplyMessage supplyMsg = ((GridDhtPartitionSupplyMessage)message); |
| |
| return supplyMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME); |
| } |
| |
| return false; |
| }); |
| |
| startGrid(1); |
| startGrid(2); |
| |
| AffinityTopologyVersion rebTopVer = getRebalancedTopVer(ignite); |
| |
| assertEquals(rebTopVer, getRebalancedTopVer(ignite(1))); |
| assertEquals(rebTopVer, getRebalancedTopVer(ignite(2))); |
| |
| HashSet<Integer> keysToUpdate = new HashSet<>(9); |
| |
| // The keys were loaded on the nodes. |
| keysToUpdate.add(partitionKeys(0, 0, 20)); |
| keysToUpdate.add(partitionKeys(1, 0, 20)); |
| keysToUpdate.add(partitionKeys(2, 0, 20)); |
| |
| //The keys were loaded on topology without one node. |
| keysToUpdate.add(partitionKeys(0, 20, 40)); |
| keysToUpdate.add(partitionKeys(1, 20, 40)); |
| keysToUpdate.add(partitionKeys(2, 20, 40)); |
| |
| // The keys were loaded on topology without two nodes. |
| keysToUpdate.add(partitionKeys(0, 40, 60)); |
| keysToUpdate.add(partitionKeys(1, 40, 60)); |
| keysToUpdate.add(partitionKeys(2, 40, 60)); |
| |
| for (Integer key : keysToUpdate) { |
| info("Intention to invike [key: " + key + |
| " part: " + ignite.affinity(DEFAULT_CACHE_NAME).partition(key) + |
| " primary: " + ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key) + ']'); |
| } |
| |
| HashMap<Integer, EntryProcessor<Integer, Integer, Void>> invokes = new HashMap<>(keysToUpdate.size()); |
| |
| for (Integer key : keysToUpdate) { |
| invokes.put(key, new TestEntryProcessor(100)); |
| } |
| |
| checkTopology(3); |
| |
| ignite.<Integer, Integer>cache(DEFAULT_CACHE_NAME).invokeAll(invokes); |
| |
| spi(ignite).stopBlock(); |
| |
| awaitPartitionMapExchange(); |
| |
| rebTopVer = getRebalancedTopVer(ignite); |
| |
| assertEquals(rebTopVer, getRebalancedTopVer(ignite(1))); |
| assertEquals(rebTopVer, getRebalancedTopVer(ignite(2))); |
| |
| assertPartitionsSame(idleVerify(ignite, DEFAULT_CACHE_NAME)); |
| } |
| |
| /** |
| * Finds a partition key. |
| * |
| * @param part Partiton. |
| * @param from Left search bound. |
| * @param to Right search bound. |
| * @return A keyu. |
| */ |
| protected Integer partitionKeys(int part, int from, int to) { |
| Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME); |
| |
| for (int k = from; k < to; k++) { |
| if (aff.partition(k) == part) { |
| return k; |
| } |
| } |
| |
| throw new AssertionError("Key was not found [pat=" + part + ", from=" + from + ", to=" + to + ']'); |
| } |
| |
| /** |
| * Gets rebalance topology version for the Ignite instance. |
| * |
| * @param instance Ignite instance. |
| * @return Topologyy version. |
| */ |
| private static AffinityTopologyVersion getRebalancedTopVer(IgniteEx instance) { |
| return ((GridDhtPartitionTopologyImpl)instance.context().cache() |
| .cache(DEFAULT_CACHE_NAME).context().topology()).getRebalancedTopVer(); |
| } |
| |
| /** |
| * The entry processor is intended to update a value when the previous one exists. |
| */ |
| private static class TestEntryProcessor implements EntryProcessor<Integer, Integer, Void> { |
| /** Ignite instance. */ |
| @IgniteInstanceResource |
| Ignite ignite; |
| |
| /** Value to update. */ |
| private final Integer val; |
| |
| /** |
| * The constructor. |
| * |
| * @param val Value to update. |
| */ |
| public TestEntryProcessor(Integer val) { |
| this.val = val; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override public Void process( |
| MutableEntry<Integer, Integer> mutableEntry, |
| Object... objects |
| ) throws EntryProcessorException { |
| log.info("Updating entry [from=" + mutableEntry.getValue() + ", to=" + val + ']'); |
| |
| if (!mutableEntry.exists()) |
| return null; |
| |
| Integer entryVal = mutableEntry.getValue(); |
| |
| if (entryVal == null) |
| return null; |
| |
| mutableEntry.setValue(val); |
| |
| log.info("Updated entry [from=" + entryVal + ", to=" + val + ']'); |
| |
| return null; |
| } |
| } |
| } |