| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import javax.cache.CacheException; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.affinity.AffinityFunctionContext; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.GridJobExecuteResponse; |
| import org.apache.ignite.internal.managers.communication.GridIoMessage; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| /** */ |
| public class IgniteDynamicCacheStartCoordinatorFailoverTest extends GridCommonAbstractTest { |
| /** Latch which blocks DynamicCacheChangeFailureMessage until main thread has sent node fail signal. */ |
| private static volatile CountDownLatch latch; |
| |
| /** */ |
| private static final String COORDINATOR_ATTRIBUTE = "coordinator"; |
| |
| /** Client mode flag. */ |
| private Boolean appendCustomAttribute; |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| latch = new CountDownLatch(1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| TcpCommunicationSpi commSpi = new CustomCommunicationSpi(); |
| commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); |
| |
| cfg.setCommunicationSpi(commSpi); |
| |
| cfg.setFailureDetectionTimeout(15_000); |
| |
| if (appendCustomAttribute) { |
| Map<String, Object> attrs = new HashMap<>(); |
| |
| attrs.put(COORDINATOR_ATTRIBUTE, Boolean.TRUE); |
| |
| cfg.setUserAttributes(attrs); |
| } |
| |
| return cfg; |
| } |
| |
| /** |
| * Tests coordinator failover during cache start failure. |
| * |
| * @throws Exception If test failed. |
| */ |
| @Test |
| public void testCoordinatorFailure() throws Exception { |
| // Start coordinator node. |
| appendCustomAttribute = true; |
| |
| Ignite g = startGrid(0); |
| |
| appendCustomAttribute = false; |
| |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| awaitPartitionMapExchange(); |
| |
| CacheConfiguration cfg = new CacheConfiguration(); |
| |
| cfg.setName("test-coordinator-failover"); |
| |
| cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); |
| |
| cfg.setAffinity(new BrokenAffinityFunction(false, getTestIgniteInstanceName(2))); |
| |
| GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| GridTestUtils.assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| g1.getOrCreateCache(cfg); |
| return null; |
| } |
| }, CacheException.class, null); |
| |
| return null; |
| } |
| }, "cache-starter-thread"); |
| |
| latch.await(); |
| |
| stopGrid(0, true); |
| |
| awaitPartitionMapExchange(); |
| |
| // Correct the cache configuration. |
| cfg.setAffinity(new RendezvousAffinityFunction()); |
| |
| IgniteCache cache = g1.getOrCreateCache(cfg); |
| |
| checkCacheOperations(g1, cache); |
| } |
| |
| /** |
| * Test the basic cache operations. |
| * |
| * @param cache Cache. |
| * @throws Exception If test failed. |
| */ |
| protected void checkCacheOperations(Ignite ignite, IgniteCache cache) throws Exception { |
| int cnt = 1000; |
| |
| // Check base cache operations. |
| for (int i = 0; i < cnt; ++i) |
| cache.put(i, i); |
| |
| for (int i = 0; i < cnt; ++i) { |
| Integer v = (Integer)cache.get(i); |
| |
| assertNotNull(v); |
| assertEquals(i, v.intValue()); |
| } |
| |
| // Check Data Streamer capabilities. |
| try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { |
| for (int i = 0; i < 10_000; ++i) |
| streamer.addData(i, i); |
| } |
| } |
| |
| /** |
| * Communication SPI which could optionally block outgoing messages. |
| */ |
| private static class CustomCommunicationSpi extends TcpCommunicationSpi { |
| /** |
| * Send message optionally either blocking it or throwing an exception if it is of |
| * {@link GridJobExecuteResponse} type. |
| * |
| * @param node Destination node. |
| * @param msg Message to be sent. |
| * @param ackClosure Ack closure. |
| * @throws org.apache.ignite.spi.IgniteSpiException If failed. |
| */ |
| @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) |
| throws IgniteSpiException { |
| |
| if (msg instanceof GridIoMessage) { |
| GridIoMessage msg0 = (GridIoMessage)msg; |
| |
| if (msg0.message() instanceof GridDhtPartitionsSingleMessage) { |
| Boolean attr = (Boolean)node.attributes().get(COORDINATOR_ATTRIBUTE); |
| |
| GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg0.message(); |
| |
| Exception err = singleMsg.getError(); |
| |
| if (Boolean.TRUE.equals(attr) && err != null) { |
| // skip message |
| latch.countDown(); |
| |
| return; |
| } |
| } |
| } |
| |
| super.sendMessage(node, msg, ackClosure); |
| } |
| } |
| |
| /** |
| * Affinity function that throws an exception when affinity nodes are calculated on the given node. |
| */ |
| public static class BrokenAffinityFunction extends RendezvousAffinityFunction { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** Exception should arise on all nodes. */ |
| private boolean eOnAllNodes = false; |
| |
| /** Exception should arise on node with certain name. */ |
| private String gridName; |
| |
| /** |
| * Default constructor. |
| */ |
| public BrokenAffinityFunction() { |
| // No-op. |
| } |
| |
| /** |
| * @param eOnAllNodes {@code True} if exception should be thrown on all nodes. |
| * @param gridName Exception should arise on node with certain name. |
| */ |
| public BrokenAffinityFunction(boolean eOnAllNodes, String gridName) { |
| this.eOnAllNodes = eOnAllNodes; |
| this.gridName = gridName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { |
| if (eOnAllNodes || ignite.name().equals(gridName)) |
| throw new IllegalStateException("Simulated exception [locNodeId=" |
| + ignite.cluster().localNode().id() + "]"); |
| else |
| return super.assignPartitions(affCtx); |
| } |
| } |
| } |