blob: 4961b1415bf29bc1de0511597442f3f799125388 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
/**
* Failover tests for cache.
*/
public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstractSelfTest {
/** */
private static final long TEST_TIMEOUT = 3 * 60 * 1000;
/** */
private static final String NEW_IGNITE_INSTANCE_NAME = "newGrid";
/** */
private static final int ENTRY_CNT = 100;
/** */
private static final int TOP_CHANGE_CNT = 10;
/** */
private static final int TOP_CHANGE_THREAD_CNT = 3;
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIMEOUT;
}
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setNetworkTimeout(60_000);
cfg.setMetricsUpdateFrequency(5_000);
TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
discoSpi.setSocketTimeout(30_000);
discoSpi.setAckTimeout(30_000);
discoSpi.setNetworkTimeout(60_000);
discoSpi.setReconnectCount(2);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
return cfg;
}
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
CacheConfiguration cfg = super.cacheConfiguration(igniteInstanceName);
cfg.setRebalanceMode(SYNC);
if (cfg.getCacheMode() == CacheMode.PARTITIONED)
cfg.setBackups(TOP_CHANGE_THREAD_CNT);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
// No-op
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
// No-op
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
startGridsMultiThreaded(gridCount());
super.beforeTest();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
* @throws Exception If failed.
*/
@Test
public void testTopologyChange() throws Exception {
testTopologyChange(null, null);
}
/**
* @throws Exception If failed.
*/
@Test
public void testConstantTopologyChange() throws Exception {
testConstantTopologyChange(null, null);
}
/**
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @throws Exception If failed.
*/
protected void testTopologyChange(@Nullable TransactionConcurrency concurrency,
@Nullable TransactionIsolation isolation) throws Exception {
boolean tx = concurrency != null && isolation != null;
if (tx)
put(ignite(0), jcache(), ENTRY_CNT, concurrency, isolation);
else
put(jcache(), ENTRY_CNT);
Ignite g = startGrid(NEW_IGNITE_INSTANCE_NAME);
check(cache(g), ENTRY_CNT);
int half = ENTRY_CNT / 2;
if (tx) {
remove(g, cache(g), half, concurrency, isolation);
put(g, cache(g), half, concurrency, isolation);
}
else {
remove(cache(g), half);
put(cache(g), half);
}
stopGrid(NEW_IGNITE_INSTANCE_NAME);
check(jcache(), ENTRY_CNT);
}
/**
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @throws Exception If failed.
*/
protected void testConstantTopologyChange(@Nullable final TransactionConcurrency concurrency,
@Nullable final TransactionIsolation isolation) throws Exception {
final boolean tx = concurrency != null && isolation != null;
if (tx)
put(ignite(0), jcache(), ENTRY_CNT, concurrency, isolation);
else
put(jcache(), ENTRY_CNT);
check(jcache(), ENTRY_CNT);
final int half = ENTRY_CNT / 2;
final AtomicReference<Exception> err = new AtomicReference<>();
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
info("Run topology change.");
try {
String name = "new-node-" + Thread.currentThread().getName();
for (int i = 0; i < TOP_CHANGE_CNT && err.get() == null; i++) {
info("Topology change " + i);
try {
final Ignite g = startGrid(name);
IgniteCache<String, Object> cache = g.cache(DEFAULT_CACHE_NAME);
for (int k = half; k < ENTRY_CNT; k++) {
String key = "key" + k;
assertNotNull("Failed to get key: 'key" + k + "'", cache.getAsync(key).get(30_000));
}
}
finally {
G.stop(name, false);
}
}
}
catch (Exception e) {
err.set(e);
log.error("Unexpected exception in topology-change-thread: " + e, e);
}
}
}, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
boolean isInterrupted = false;
try {
while (!fut.isDone() && !isInterrupted) {
if (tx) {
remove(grid(0), jcache(), half, concurrency, isolation);
put(grid(0), jcache(), half, concurrency, isolation);
}
else {
remove(jcache(), half);
put(jcache(), half);
}
isInterrupted = Thread.currentThread().isInterrupted();
}
if (isInterrupted) {
Thread.currentThread().interrupt();
fut.cancel();
}
}
catch (Exception e) {
err.set(e);
log.error("Unexpected exception: " + e, e);
throw e;
}
if (!isInterrupted)
fut.get();
Exception err0 = err.get();
if (err0 != null)
throw err0;
}
/**
* @param cache Cache.
* @param cnt Entry count.
* @throws IgniteCheckedException If failed.
*/
private void put(IgniteCache<String, Integer> cache, int cnt) throws Exception {
try {
for (int i = 0; i < cnt; i++)
cache.put("key" + i, i);
}
catch (CacheException e) {
if (!X.hasCause(e, ClusterTopologyCheckedException.class) && !(e instanceof CachePartialUpdateException))
throw e;
}
}
/**
* @param ignite Ignite.
* @param cache Cache.
* @param cnt Entry count.
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @throws IgniteCheckedException If failed.
*/
private void put(Ignite ignite,
IgniteCache<String, Integer> cache,
final int cnt,
TransactionConcurrency concurrency,
TransactionIsolation isolation)
throws Exception {
try {
info("Putting values to cache [0," + cnt + ')');
CU.inTx(ignite, cache, concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
for (int i = 0; i < cnt; i++)
cache.put("key" + i, i);
}
});
}
catch (Exception e) {
// It is ok to fail with topology exception.
if (!X.hasCause(e, ClusterTopologyCheckedException.class))
throw e;
else
info("Failed to put values to cache due to topology exception [0," + cnt + ')');
}
}
/**
* @param cache Cache.
* @param cnt Entry count.
* @throws IgniteCheckedException If failed.
*/
private void remove(IgniteCache<String, Integer> cache, int cnt) throws Exception {
try {
for (int i = 0; i < cnt; i++)
cache.remove("key" + i);
}
catch (CacheException e) {
if (!X.hasCause(e, ClusterTopologyCheckedException.class) && !(e instanceof CachePartialUpdateException))
throw e;
}
}
/**
* @param ignite Ignite.
* @param cache Cache.
* @param cnt Entry count.
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @throws IgniteCheckedException If failed.
*/
private void remove(Ignite ignite, IgniteCache<String, Integer> cache, final int cnt,
TransactionConcurrency concurrency, final TransactionIsolation isolation) throws Exception {
try {
info("Removing values form cache [0," + cnt + ')');
CU.inTx(ignite, cache, concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
for (int i = 0; i < cnt; i++) {
String key = "key" + i;
// Use removeAll for serializable tx to avoid version check.
if (isolation == TransactionIsolation.SERIALIZABLE)
cache.removeAll(Collections.singleton(key));
else
cache.remove(key);
}
}
});
}
catch (Exception e) {
// It is ok to fail with topology exception.
if (!X.hasCause(e, ClusterTopologyCheckedException.class))
throw e;
else
info("Failed to remove values from cache due to topology exception [0," + cnt + ')');
}
}
/**
* @param cache Cache.
* @param expSize Minimum expected cache size.
* @throws Exception If failed.
*/
private void check(final IgniteCache<String, Integer> cache, final int expSize) throws Exception {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return cache.size() >= expSize;
}
}, 5000);
int size = cache.size();
assertTrue("Key set size is lesser then the expected size [size=" + size + ", expSize=" + expSize + ']',
size >= expSize);
for (int i = 0; i < expSize; i++)
assertNotNull("Failed to get value for key: 'key" + i + "'", cache.get("key" + i));
}
/**
* @param g Grid.
* @return Cache.
*/
private IgniteCache<String, Integer> cache(Ignite g) {
return g.cache(DEFAULT_CACHE_NAME);
}
}