blob: eb2806239a81f3274434788091dff73f99559cbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.TransactionSerializationException;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientStartCoordinatorFailsAtomic() throws Exception {
clientStartCoordinatorFails(ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientStartCoordinatorFailsTx() throws Exception {
clientStartCoordinatorFails(TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientStartCoordinatorFailsMvccTx() throws Exception {
clientStartCoordinatorFails(TRANSACTIONAL_SNAPSHOT);
}
/**
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void clientStartCoordinatorFails(CacheAtomicityMode atomicityMode) throws Exception {
Ignite srv0 = startGrids(3);
final int KEYS = 500;
IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1));
for (int i = 0; i < KEYS; i++)
cache.put(i, i);
final Ignite c = startClientGrid(3);
TestRecordingCommunicationSpi.spi(srv0).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name());
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
c.cache(DEFAULT_CACHE_NAME);
return null;
}
}, "start-cache");
assertFalse(fut.isDone());
stopGrid(0);
fut.get();
cache = c.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < KEYS; i++) {
assertEquals(i, cache.get(i));
cache.put(i, i + 1);
assertEquals(i + 1, cache.get(i));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientStartLastServerFailsAtomic() throws Exception {
clientStartLastServerFails(ATOMIC);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientStartLastServerFailsTx() throws Exception {
clientStartLastServerFails(TRANSACTIONAL);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientStartLastServerFailsMvccTx() throws Exception {
clientStartLastServerFails(TRANSACTIONAL_SNAPSHOT);
}
/**
* @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void clientStartLastServerFails(CacheAtomicityMode atomicityMode) throws Exception {
startGrids(3);
CacheConfiguration<Object, Object> cfg = cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1);
cfg.setNodeFilter(new TestNodeFilter(getTestIgniteInstanceName(1)));
Ignite srv1 = ignite(1);
srv1.createCache(cfg);
final Ignite c = startClientGrid(3);
TestRecordingCommunicationSpi.spi(srv1).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name());
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
c.cache(DEFAULT_CACHE_NAME);
return null;
}
}, "start-cache");
assertFalse(fut.isDone());
stopGrid(1);
fut.get();
final IgniteCache<Object, Object> clientCache = c.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < 10; i++) {
final int k = i;
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
clientCache.get(k);
return null;
}
}, CacheServerNotFoundException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
clientCache.put(k, k);
return null;
}
}, CacheServerNotFoundException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
clientCache.remove(k);
return null;
}
}, CacheServerNotFoundException.class, null);
}
startGrid(1);
awaitPartitionMapExchange();
for (int i = 0; i < 100; i++) {
assertNull(clientCache.get(i));
clientCache.put(i, i);
assertEquals(i, clientCache.get(i));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRebalanceState() throws Exception {
final int SRVS = 3;
startGrids(SRVS);
List<String> cacheNames = startCaches(ignite(0), 100);
Ignite c = startClientGrid(SRVS);
assertTrue(c.configuration().isClientMode());
awaitPartitionMapExchange();
TestRecordingCommunicationSpi.spi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode clusterNode, Message msg) {
return msg instanceof GridDhtPartitionsFullMessage &&
((GridDhtPartitionsFullMessage)msg).exchangeId() == null;
}
});
startGrid(SRVS + 1);
for (String cacheName : cacheNames)
c.cache(cacheName);
// Will switch to ideal topology but some partitions are not evicted yet.
for (int i = 0; i < SRVS + 1; i++) {
AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2, 1);
IgniteKernal node = (IgniteKernal)ignite(i);
for (String cacheName : cacheNames) {
GridDhtPartitionTopology top = node.cachex(cacheName).context().topology();
waitForReadyTopology(top, topVer);
assertEquals(topVer, top.readyTopologyVersion());
}
}
TestRecordingCommunicationSpi.spi(ignite(0)).stopBlock();
// Trigger eviction.
awaitPartitionMapExchange();
for (int i = 0; i < SRVS + 1; i++) {
final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2, 1);
final IgniteKernal node = (IgniteKernal)ignite(i);
for (String cacheName : cacheNames) {
final GridDhtPartitionTopology top = node.cachex(cacheName).context().topology();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return top.rebalanceFinished(topVer);
}
}, 5000);
assertTrue(top.rebalanceFinished(topVer));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRebalanceStateConcurrentStart() throws Exception {
final int SRVS1 = 3;
final int CLIENTS = 5;
final int SRVS2 = 5;
startGrids(SRVS1);
Ignite srv0 = ignite(0);
final int KEYS = 1000;
final List<String> cacheNames = startCaches(srv0, KEYS);
final List<Ignite> clients = new ArrayList<>();
for (int i = 0; i < CLIENTS; i++)
clients.add(startClientGrid(SRVS1 + i));
final CyclicBarrier barrier = new CyclicBarrier(clients.size() + SRVS2);
final AtomicInteger clientIdx = new AtomicInteger();
final Set<Integer> keys = new HashSet<>();
for (int i = 0; i < KEYS; i++)
keys.add(i);
IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
barrier.await();
Ignite client = clients.get(clientIdx.getAndIncrement());
for (String cacheName : cacheNames)
client.cache(cacheName);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 10; i++) {
for (String cacheName : cacheNames) {
IgniteCache<Object, Object> cache = client.cache(cacheName);
Map<Object, Object> map0 = cache.getAll(keys);
assertEquals("[cache=" + cacheName +
", expected=" + KEYS +
", actual=" + map0.size() + ']', KEYS, map0.size());
int key = rnd.nextInt(KEYS);
try {
cache.put(key, i);
}
catch (CacheException e) {
log.error("It couldn't put a value [cache=" + cacheName +
", key=" + key +
", val=" + i + ']', e);
CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class);
TransactionSerializationException txEx = X.cause(e, TransactionSerializationException.class);
boolean notContains = !txEx.getMessage().contains(
"Cannot serialize transaction due to write conflict (transaction is marked for rollback)"
);
if (txEx == null || ccfg.getAtomicityMode() != TRANSACTIONAL_SNAPSHOT || notContains)
fail("Assert violated because exception was thrown [e=" + e.getMessage() + ']');
}
}
}
return null;
}
}, clients.size(), "client-cache-start");
final AtomicInteger srvIdx = new AtomicInteger(SRVS1 + CLIENTS);
IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
barrier.await();
startGrid(srvIdx.incrementAndGet());
return null;
}
}, SRVS2, "node-start");
fut1.get();
fut2.get();
final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS1 + SRVS2 + CLIENTS, 1);
for (Ignite client : clients) {
for (String cacheName : cacheNames) {
final GridDhtPartitionTopology top =
((IgniteKernal)client).context().cache().internalCache(cacheName).context().topology();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return top.rebalanceFinished(topVer);
}
}, 5000);
assertTrue(top.rebalanceFinished(topVer));
}
}
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-11810")
@Test
public void testClientStartCloseServersRestart() throws Exception {
final int SRVS = 4;
final int CLIENTS = 4;
startGrids(SRVS);
final List<String> cacheNames = startCaches(ignite(0), 1000);
final List<Ignite> clients = new ArrayList<>();
for (int i = 0; i < CLIENTS; i++)
clients.add(startClientGrid(SRVS + i));
final AtomicBoolean stop = new AtomicBoolean();
IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
int nodeIdx = rnd.nextInt(SRVS);
stopGrid(nodeIdx);
U.sleep(rnd.nextLong(200) + 1);
startGrid(nodeIdx);
U.sleep(rnd.nextLong(200) + 1);
}
return null;
}
}, "restart");
final AtomicInteger clientIdx = new AtomicInteger();
IgniteInternalFuture<?> clientsFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
Ignite client = clients.get(clientIdx.getAndIncrement());
assertTrue(client.configuration().isClientMode());
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
for (String cacheName : cacheNames)
client.cache(cacheName);
for (String cacheName : cacheNames) {
IgniteCache<Object, Object> cache = client.cache(cacheName);
cache.put(rnd.nextInt(1000), rnd.nextInt());
cache.get(rnd.nextInt(1000));
}
for (String cacheName : cacheNames) {
IgniteCache<Object, Object> cache = client.cache(cacheName);
cache.close();
}
}
return null;
}
}, CLIENTS, "client-thread");
try {
U.sleep(10_000);
stop.set(true);
restartFut.get();
clientsFut.get();
}
finally {
stop.set(true);
}
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (Ignite client : clients) {
for (String cacheName : cacheNames) {
IgniteCache<Object, Object> cache = client.cache(cacheName);
for (int i = 0; i < 10; i++) {
Integer key = rnd.nextInt(1000);
cache.put(key, i);
assertEquals(i, cache.get(key));
}
}
}
}
/**
* @param node Node.
* @param keys Number of keys to put in caches.
* @return Cache names.
*/
private List<String> startCaches(Ignite node, int keys) {
List<String> cacheNames = new ArrayList<>();
final Map<Integer, Integer> map = new TreeMap<>();
for (int i = 0; i < keys; i++)
map.put(i, i);
for (int i = 0; i < 3; i++) {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration("atomic-" + i, ATOMIC, i);
IgniteCache<Object, Object> cache = node.createCache(ccfg);
cacheNames.add(ccfg.getName());
cache.putAll(map);
}
for (int i = 0; i < 3; i++) {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration("tx-" + i, TRANSACTIONAL, i);
IgniteCache<Object, Object> cache = node.createCache(ccfg);
cacheNames.add(ccfg.getName());
cache.putAll(map);
}
for (int i = 0; i < 3; i++) {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration("mvcc-" + i, TRANSACTIONAL_SNAPSHOT, i);
IgniteCache<Object, Object> cache = node.createCache(ccfg);
cacheNames.add(ccfg.getName());
cache.putAll(map);
}
return cacheNames;
}
/**
* @param name Cache name.
* @param atomicityMode Cache atomicity mode.
* @param backups Number of backups.
* @return Cache configuration.
*/
private CacheConfiguration<Object, Object> cacheConfiguration(String name, CacheAtomicityMode atomicityMode, int backups) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setBackups(backups);
return ccfg;
}
/**
*
*/
private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
/** */
private final String includeName;
/**
* @param includeName Node to include.
*/
public TestNodeFilter(String includeName) {
this.includeName = includeName;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
return includeName.equals(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME));
}
}
}