blob: 88981404f9ccadf28a591345d9bed928fbcca87e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/** */
public class IgniteDynamicCacheStartCoordinatorFailoverTest extends GridCommonAbstractTest {
/** Latch which blocks DynamicCacheChangeFailureMessage until main thread has sent node fail signal. */
private static volatile CountDownLatch latch;
/** */
private static final String COORDINATOR_ATTRIBUTE = "coordinator";
/** Client mode flag. */
private Boolean appendCustomAttribute;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
latch = new CountDownLatch(1);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpCommunicationSpi commSpi = new CustomCommunicationSpi();
commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
cfg.setCommunicationSpi(commSpi);
cfg.setFailureDetectionTimeout(15_000);
if (appendCustomAttribute) {
Map<String, Object> attrs = new HashMap<>();
attrs.put(COORDINATOR_ATTRIBUTE, Boolean.TRUE);
cfg.setUserAttributes(attrs);
}
return cfg;
}
/**
* Tests coordinator failover during cache start failure.
*
* @throws Exception If test failed.
*/
@Test
public void testCoordinatorFailure() throws Exception {
// Start coordinator node.
appendCustomAttribute = true;
Ignite g = startGrid(0);
appendCustomAttribute = false;
Ignite g1 = startGrid(1);
Ignite g2 = startGrid(2);
awaitPartitionMapExchange();
CacheConfiguration cfg = new CacheConfiguration();
cfg.setName("test-coordinator-failover");
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setAffinity(new BrokenAffinityFunction(false, getTestIgniteInstanceName(2)));
GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
g1.getOrCreateCache(cfg);
return null;
}
}, CacheException.class, null);
return null;
}
}, "cache-starter-thread");
latch.await();
stopGrid(0, true);
awaitPartitionMapExchange();
// Correct the cache configuration.
cfg.setAffinity(new RendezvousAffinityFunction());
IgniteCache cache = g1.getOrCreateCache(cfg);
checkCacheOperations(g1, cache);
}
/**
* Test the basic cache operations.
*
* @param cache Cache.
* @throws Exception If test failed.
*/
protected void checkCacheOperations(Ignite ignite, IgniteCache cache) throws Exception {
int cnt = 1000;
// Check base cache operations.
for (int i = 0; i < cnt; ++i)
cache.put(i, i);
for (int i = 0; i < cnt; ++i) {
Integer v = (Integer) cache.get(i);
assertNotNull(v);
assertEquals(i, v.intValue());
}
// Check Data Streamer capabilities.
try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) {
for (int i = 0; i < 10_000; ++i)
streamer.addData(i, i);
}
}
/**
* Communication SPI which could optionally block outgoing messages.
*/
private static class CustomCommunicationSpi extends TcpCommunicationSpi {
/**
* Send message optionally either blocking it or throwing an exception if it is of
* {@link GridJobExecuteResponse} type.
*
* @param node Destination node.
* @param msg Message to be sent.
* @param ackClosure Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException If failed.
*/
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
GridIoMessage msg0 = (GridIoMessage)msg;
if (msg0.message() instanceof GridDhtPartitionsSingleMessage) {
Boolean attr = (Boolean) node.attributes().get(COORDINATOR_ATTRIBUTE);
GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg0.message();
Exception err = singleMsg.getError();
if (Boolean.TRUE.equals(attr) && err != null) {
// skip message
latch.countDown();
return;
}
}
}
super.sendMessage(node, msg, ackClosure);
}
}
/**
* Affinity function that throws an exception when affinity nodes are calculated on the given node.
*/
public static class BrokenAffinityFunction extends RendezvousAffinityFunction {
/** */
private static final long serialVersionUID = 0L;
/** */
@IgniteInstanceResource
private Ignite ignite;
/** Exception should arise on all nodes. */
private boolean eOnAllNodes = false;
/** Exception should arise on node with certain name. */
private String gridName;
/**
* Default constructor.
*/
public BrokenAffinityFunction() {
// No-op.
}
/**
* @param eOnAllNodes {@code True} if exception should be thrown on all nodes.
* @param gridName Exception should arise on node with certain name.
*/
public BrokenAffinityFunction(boolean eOnAllNodes, String gridName) {
this.eOnAllNodes = eOnAllNodes;
this.gridName = gridName;
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
if (eOnAllNodes || ignite.name().equals(gridName))
throw new IllegalStateException("Simulated exception [locNodeId="
+ ignite.cluster().localNode().id() + "]");
else
return super.assignPartitions(affCtx);
}
}
}