| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DeploymentMode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.binary.BinaryMarshaller; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; |
| import static org.apache.ignite.configuration.DeploymentMode.SHARED; |
| |
| /** |
| * Cache + Deployment test. |
| */ |
| public class GridCacheDeploymentSelfTest extends GridCommonAbstractTest { |
| /** IP finder. */ |
| private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); |
| |
| /** Name for Ignite instance without cache. */ |
| protected static final String IGNITE_INSTANCE_NAME = "grid-no-cache"; |
| |
| /** First test task name. */ |
| protected static final String TEST_TASK_1 = "org.apache.ignite.tests.p2p.CacheDeploymentTestTask1"; |
| |
| /** Second test task name. */ |
| protected static final String TEST_TASK_2 = "org.apache.ignite.tests.p2p.CacheDeploymentTestTask2"; |
| |
| /** Third test task name. */ |
| protected static final String TEST_TASK_3 = "org.apache.ignite.tests.p2p.CacheDeploymentTestTask3"; |
| |
| /** Test value 1. */ |
| protected static final String TEST_KEY = "org.apache.ignite.tests.p2p.CacheDeploymentTestKey"; |
| |
| /** Test value 1. */ |
| protected static final String TEST_VALUE_1 = "org.apache.ignite.tests.p2p.CacheDeploymentTestValue"; |
| |
| /** Test value 2. */ |
| protected static final String TEST_VALUE_2 = "org.apache.ignite.tests.p2p.CacheDeploymentTestValue2"; |
| |
| /** */ |
| protected DeploymentMode depMode; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| cfg.setDeploymentMode(depMode); |
| |
| if (IGNITE_INSTANCE_NAME.equals(igniteInstanceName)) |
| cfg.setCacheConfiguration(); |
| else |
| cfg.setCacheConfiguration(cacheConfiguration()); |
| |
| TcpDiscoverySpi disco = new TcpDiscoverySpi(); |
| |
| disco.setIpFinder(IP_FINDER); |
| |
| cfg.setDiscoverySpi(disco); |
| |
| cfg.setConnectorConfiguration(null); |
| |
| return cfg; |
| } |
| |
| /** |
| * @return Cache configuration. |
| * @throws Exception In case of error. |
| */ |
| protected CacheConfiguration cacheConfiguration() throws Exception { |
| CacheConfiguration cfg = defaultCacheConfiguration(); |
| |
| cfg.setCacheMode(PARTITIONED); |
| cfg.setWriteSynchronizationMode(FULL_SYNC); |
| cfg.setRebalanceMode(SYNC); |
| cfg.setAtomicityMode(TRANSACTIONAL); |
| cfg.setNearConfiguration(new NearCacheConfiguration()); |
| cfg.setBackups(1); |
| |
| return cfg; |
| } |
| |
| /** |
| * Checks whether a cache should be undeployed in SHARED or CONTINUOUS modes. |
| * |
| * @param g Ignite node. |
| * @return {@code true} if the cache has to be undeployed, {@code false} otherwise. |
| */ |
| protected boolean isCacheUndeployed(Ignite g) { |
| return !(g.configuration().getMarshaller() instanceof BinaryMarshaller); |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment() throws Exception { |
| try { |
| depMode = CONTINUOUS; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| Ignite g0 = startGrid(IGNITE_INSTANCE_NAME); |
| |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class cls = ldr.loadClass(TEST_TASK_1); |
| |
| g0.compute().execute(cls, g1.cluster().localNode()); |
| |
| cls = ldr.loadClass(TEST_TASK_2); |
| |
| g0.compute().execute(cls, g2.cluster().localNode()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment2() throws Exception { |
| try { |
| depMode = CONTINUOUS; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| Ignite g0 = startGrid(IGNITE_INSTANCE_NAME); |
| |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class cls = ldr.loadClass(TEST_TASK_3); |
| |
| String key = ""; |
| |
| for (int i = 0; i < 1000; i++) { |
| key = "1" + i; |
| |
| if (g1.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key).id().equals(g2.cluster().localNode().id())) |
| break; |
| } |
| |
| g0.compute().execute(cls, new T2<>(g1.cluster().localNode(), key)); |
| |
| cls = ldr.loadClass(TEST_TASK_2); |
| |
| g0.compute().execute(cls, g2.cluster().localNode()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment3() throws Exception { |
| try { |
| depMode = SHARED; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| Ignite g0 = startGrid(IGNITE_INSTANCE_NAME); |
| |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class cls = ldr.loadClass(TEST_TASK_3); |
| |
| String key = ""; |
| |
| for (int i = 0; i < 1000; i++) { |
| key = "1" + i; |
| |
| if (g1.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key).id().equals(g2.cluster().localNode().id())) |
| break; |
| } |
| |
| g0.compute().execute(cls, new T2<>(g1.cluster().localNode(), key)); |
| |
| stopGrid(IGNITE_INSTANCE_NAME); |
| |
| for (int i = 0; i < 10; i++) { |
| if (g1.cache(DEFAULT_CACHE_NAME).localSize() == 0 && g2.cache(DEFAULT_CACHE_NAME).localSize() == 0) |
| break; |
| |
| U.sleep(500); |
| } |
| |
| assertEquals(0, g1.cache(DEFAULT_CACHE_NAME).localSize()); |
| |
| assertEquals(isCacheUndeployed(g1) ? 0 : 1, g2.cache(DEFAULT_CACHE_NAME).localSize()); |
| |
| startGrid(3); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment4() throws Exception { |
| doDeployment4(false); |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment4BackupLeavesGrid() throws Exception { |
| doDeployment4(true); |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| private void doDeployment4(boolean backupLeavesGrid) throws Exception { |
| try { |
| depMode = CONTINUOUS; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| Ignite g0 = startGrid(IGNITE_INSTANCE_NAME); |
| |
| info("Started grids:"); |
| info("g0: " + g0.cluster().localNode().id()); |
| info("g1: " + g1.cluster().localNode().id()); |
| info("g2: " + g2.cluster().localNode().id()); |
| |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class cls = ldr.loadClass(TEST_TASK_3); |
| |
| String key = ""; |
| |
| for (int i = 0; i < 1000; i++) { |
| key = "1" + i; |
| |
| if (g1.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key).id().equals(g2.cluster().localNode().id()) && |
| g1.affinity(DEFAULT_CACHE_NAME).isBackup((backupLeavesGrid ? g0 : g1).cluster().localNode(), key)) |
| break; |
| } |
| |
| g0.compute().execute(cls, new T2<>(g1.cluster().localNode(), key)); |
| |
| stopGrid(IGNITE_INSTANCE_NAME); |
| |
| assertEquals(1, g1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL)); |
| |
| assertEquals(1, g2.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL)); |
| |
| startGrid(3); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment5() throws Exception { |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class val1Cls = ldr.loadClass(TEST_VALUE_1); |
| Class val2Cls = ldr.loadClass(TEST_VALUE_2); |
| Class task2Cls = ldr.loadClass(TEST_TASK_2); |
| |
| try { |
| depMode = SHARED; |
| |
| Ignite g0 = startGrid(0); |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| info(">>>>>>> Grid 0: " + g0.cluster().localNode().id()); |
| info(">>>>>>> Grid 1: " + g1.cluster().localNode().id()); |
| info(">>>>>>> Grid 2: " + g2.cluster().localNode().id()); |
| |
| int key = 0; |
| |
| key = getNextKey(key, g0, g1.cluster().localNode(), g2.cluster().localNode(), g0.cluster().localNode()); |
| |
| info("Key: " + key); |
| |
| IgniteCache<Object, Object> cache = g0.cache(DEFAULT_CACHE_NAME); |
| |
| assert cache != null; |
| |
| cache.put(key, new ArrayList<>(Arrays.asList(val1Cls.newInstance()))); |
| |
| info(">>>>>>> First put completed."); |
| |
| key = getNextKey(key + 1, g0, g2.cluster().localNode(), g0.cluster().localNode(), g1.cluster().localNode()); |
| |
| info("Key: " + key); |
| |
| cache.put(key, val1Cls.newInstance()); |
| |
| info(">>>>>>> Second put completed."); |
| |
| key = getNextKey(key + 1, g0, g1.cluster().localNode(), g2.cluster().localNode(), g0.cluster().localNode()); |
| |
| info("Key: " + key); |
| |
| cache.put(key, val2Cls.newInstance()); |
| |
| info(">>>>>>> Third put completed."); |
| |
| g0.compute().execute(task2Cls, g1.cluster().localNode()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment6() throws Exception { |
| try { |
| depMode = SHARED; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class cls = ldr.loadClass(TEST_TASK_3); |
| |
| String key = ""; |
| |
| for (int i = 0; i < 1000; i++) { |
| key = "1" + i; |
| |
| if (g1.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key).id().equals(g2.cluster().localNode().id())) |
| break; |
| } |
| |
| g1.compute().execute(cls, new T2<>(g2.cluster().localNode(), key)); |
| |
| stopGrid(1); |
| |
| startGrid(3); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| @SuppressWarnings("unchecked") |
| public void testDeployment7() throws Exception { |
| try { |
| depMode = SHARED; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class cls = ldr.loadClass(TEST_TASK_3); |
| |
| String key = ""; |
| |
| for (int i = 0; i < 1000; i++) { |
| key = "1" + i; |
| |
| if (g1.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key).id().equals(g2.cluster().localNode().id())) |
| break; |
| } |
| |
| g2.compute().execute(cls, new T2<>(g2.cluster().localNode(), key)); |
| |
| stopGrid(2); |
| |
| startGrid(3); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** @throws Exception If failed. */ |
| public void testPartitionedDeploymentPreloading() throws Exception { |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class valCls = ldr.loadClass(TEST_VALUE_1); |
| |
| try { |
| depMode = SHARED; |
| |
| Ignite g = startGrid(0); |
| |
| g.cache(DEFAULT_CACHE_NAME).put(0, valCls.newInstance()); |
| |
| info("Added value to cache 0."); |
| |
| startGrid(1); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void testCacheUndeploymentSharedMode() throws Exception { |
| testCacheUndeployment(SHARED); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void testCacheUndeploymentContMode() throws Exception { |
| testCacheUndeployment(CONTINUOUS); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void testCacheUndeployment(DeploymentMode depMode) throws Exception { |
| ClassLoader ldr = getExternalClassLoader(); |
| |
| Class valCls = ldr.loadClass(TEST_VALUE_1); |
| Class taskCls = ldr.loadClass(TEST_TASK_2); |
| |
| try { |
| this.depMode = depMode; |
| |
| Ignite g0 = startGrid(0); |
| Ignite g1 = startGrid(1); |
| |
| for (int i = 0; i < 20; i++) |
| g0.cache(DEFAULT_CACHE_NAME).put(i, valCls.newInstance()); |
| |
| assert g0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL) > 0 : "Cache is empty"; |
| assert g1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL) > 0 : "Cache is empty"; |
| |
| g0.compute(g0.cluster().forRemotes()).execute(taskCls, g1.cluster().localNode()); |
| |
| stopGrid(0); |
| |
| if (depMode == SHARED && isCacheUndeployed(g1)) { |
| for (int i = 0; i < 10; i++) { |
| if (g1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL) == 0) |
| break; |
| |
| Thread.sleep(500); |
| } |
| |
| assertEquals(0, g1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL)); |
| } |
| else { |
| for (int i = 0; i < 4; i++) { |
| if (g1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL) == 0) |
| break; |
| |
| Thread.sleep(500); |
| } |
| |
| assert g1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL) > 0 : "Cache undeployed unexpectadly"; |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * Looks for next key starting from {@code start} for which primary node is {@code primary} and backup is {@code |
| * backup}. |
| * |
| * @param start Key to start search from, inclusive. |
| * @param g Grid on which check will be performed. |
| * @param primary Expected primary node. |
| * @param backup Expected backup node. |
| * @param near Expected near node. |
| * @return Key with described properties. |
| * @throws IllegalStateException if such a key could not be found after 10000 iterations. |
| */ |
| private int getNextKey(int start, Ignite g, ClusterNode primary, ClusterNode backup, ClusterNode near) |
| throws Exception { |
| awaitPartitionMapExchange(); |
| |
| info("Primary: " + primary); |
| info("Backup: " + backup); |
| info("Near: " + near); |
| |
| for (int i = start; i < start + 10000; i++) { |
| if (g.affinity(DEFAULT_CACHE_NAME).isPrimary(primary, i) && g.affinity(DEFAULT_CACHE_NAME).isBackup(backup, i)) { |
| assert !g.affinity(DEFAULT_CACHE_NAME).isPrimary(near, i) : "Key: " + i; |
| assert !g.affinity(DEFAULT_CACHE_NAME).isBackup(near, i) : "Key: " + i; |
| |
| return i; |
| } |
| } |
| |
| throw new IllegalStateException("Unable to find matching key [start=" + start + ", primary=" + primary.id() + |
| ", backup=" + backup.id() + ']'); |
| } |
| } |