blob: 8e0a74c1584b2efa622e0b1187aea12008921d75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/**
* High Availability tests.
*/
public class ReliabilityTest extends GridCommonAbstractTest {
/**
* Thin clint failover.
*/
@Test
public void testFailover() throws Exception {
final int CLUSTER_SIZE = 3;
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(CLUSTER_SIZE);
IgniteClient client = Ignition.startClient(new ClientConfiguration()
.setReconnectThrottlingRetries(0) // Disable throttling.
.setAddresses(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE]))
)
) {
final Random rnd = new Random();
final ClientCache<Integer, String> cache = client.getOrCreateCache(
new ClientCacheConfiguration().setName("testFailover").setCacheMode(CacheMode.REPLICATED)
);
// Simple operation failover: put/get
assertOnUnstableCluster(cluster, () -> {
Integer key = rnd.nextInt();
String val = key.toString();
cache.put(key, val);
String cachedVal = cache.get(key);
assertEquals(val, cachedVal);
});
cache.clear();
// Composite operation failover: query
Map<Integer, String> data = IntStream.rangeClosed(1, 1000).boxed()
.collect(Collectors.toMap(i -> i, i -> String.format("String %s", i)));
assertOnUnstableCluster(cluster, () -> {
cache.putAll(data);
Query<Cache.Entry<Integer, String>> qry =
new ScanQuery<Integer, String>().setPageSize(data.size() / 10);
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, String>> res = cur.getAll();
assertEquals("Unexpected number of entries", data.size(), res.size());
Map<Integer, String> act = res.stream()
.collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue));
assertEquals("Unexpected entries", data, act);
}
});
// Client fails if all nodes go down
cluster.close();
boolean igniteUnavailable = false;
try {
cache.put(1, "1");
}
catch (ClientConnectionException ex) {
igniteUnavailable = true;
Throwable[] suppressed = ex.getSuppressed();
assertEquals(suppressed.length, CLUSTER_SIZE - 1);
assertTrue(Stream.of(suppressed).allMatch(t -> t instanceof ClientConnectionException));
}
assertTrue(igniteUnavailable);
}
}
/**
* Test single server failover.
*/
@Test
public void testSingleServerFailover() throws Exception {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(new ClientConfiguration()
.setAddresses(cluster.clientAddresses().iterator().next()))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
// Before fail.
cache.put(0, 0);
// Fail.
dropAllThinClientConnections(Ignition.allGrids().get(0));
try {
cache.put(0, 0);
}
catch (Exception expected) {
// No-op.
}
// Recover after fail.
cache.put(0, 0);
}
}
/**
* Test that failover doesn't lead to silent query inconsistency.
*/
@Test
public void testQueryConsistencyOnFailover() throws Exception {
int CLUSTER_SIZE = 2;
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(CLUSTER_SIZE);
IgniteClient client = Ignition.startClient(new ClientConfiguration()
.setAddresses(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE])))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
cache.put(0, 0);
cache.put(1, 1);
Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>().setPageSize(1);
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
int cnt = 0;
for (Iterator<Cache.Entry<Integer, String>> it = cur.iterator(); it.hasNext(); it.next()) {
cnt++;
if (cnt == 1) {
for (int i = 0; i < CLUSTER_SIZE; i++)
dropAllThinClientConnections(Ignition.allGrids().get(i));
}
}
fail("ClientReconnectedException must be thrown");
}
catch (ClientReconnectedException expected) {
// No-op.
}
}
}
/**
* Test that client works properly with servers txId intersection.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testTxWithIdIntersection() throws Exception {
int CLUSTER_SIZE = 2;
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(CLUSTER_SIZE);
IgniteClient client = Ignition.startClient(new ClientConfiguration()
.setAddresses(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE])))
) {
ClientCache<Integer, Integer> cache = client.createCache(new ClientCacheConfiguration().setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
CyclicBarrier barrier = new CyclicBarrier(2);
GridTestUtils.runAsync(() -> {
try {
// Another thread starts transaction here.
barrier.await(1, TimeUnit.SECONDS);
for (int i = 0; i < CLUSTER_SIZE; i++)
dropAllThinClientConnections(Ignition.allGrids().get(i));
ClientTransaction tx = client.transactions().txStart();
barrier.await(1, TimeUnit.SECONDS);
// Another thread puts to cache here.
barrier.await(1, TimeUnit.SECONDS);
tx.commit();
barrier.await(1, TimeUnit.SECONDS);
}
catch (Exception e) {
log.error("Unexpected error", e);
}
});
ClientTransaction tx = client.transactions().txStart();
barrier.await(1, TimeUnit.SECONDS);
// Another thread drops connections and create new transaction here, which started on another node with the
// same transaction id as we started in this thread.
barrier.await(1, TimeUnit.SECONDS);
GridTestUtils.assertThrows(null, () -> {
cache.put(0, 0);
return null;
}, ClientException.class, "Transaction context has been lost due to connection errors");
tx.close();
barrier.await(1, TimeUnit.SECONDS);
// Another thread commit transaction here.
barrier.await(1, TimeUnit.SECONDS);
assertFalse(cache.containsKey(0));
}
}
/**
* Test reconnection throttling.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testReconnectionThrottling() throws Exception {
int throttlingRetries = 5;
long throttlingPeriod = 3_000L;
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(new ClientConfiguration()
.setReconnectThrottlingPeriod(throttlingPeriod)
.setReconnectThrottlingRetries(throttlingRetries)
.setAddresses(cluster.clientAddresses().toArray(new String[1])))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
for (int i = 0; i < throttlingRetries; i++) {
// Attempts to reconnect within throttlingRetries should pass.
cache.put(0, 0);
dropAllThinClientConnections(Ignition.allGrids().get(0));
GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0), ClientConnectionException.class);
}
for (int i = 0; i < 10; i++) // Attempts to reconnect after throttlingRetries should fail.
GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0), ClientConnectionException.class);
doSleep(throttlingPeriod);
// Attempt to reconnect after throttlingPeriod should pass.
assertTrue(GridTestUtils.waitForCondition(() -> {
try {
cache.put(0, 0);
return true;
}
catch (ClientConnectionException e) {
return false;
}
}, throttlingPeriod));
}
}
/**
* Drop all thin client connections on given Ignite instance.
*
* @param ignite Ignite.
*/
private void dropAllThinClientConnections(Ignite ignite) throws Exception {
ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
ClientListenerProcessor.class, ClientProcessorMXBean.class);
mxBean.dropAllConnections();
}
/**
* Run the closure while Ignite nodes keep failing/recovering several times.
*/
private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable clo) throws Exception {
// Keep changing Ignite cluster topology by adding/removing nodes.
final AtomicBoolean stopFlag = new AtomicBoolean(false);
Future<?> topChangeFut = Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 0; i < 5 && !stopFlag.get(); i++) {
while (cluster.size() != 1)
cluster.failNode();
while (cluster.size() != cluster.getInitialSize())
cluster.restoreNode();
awaitPartitionMapExchange();
}
}
catch (InterruptedException ignore) {
// No-op.
}
stopFlag.set(true);
});
// Use Ignite while nodes keep failing.
try {
while (!stopFlag.get())
clo.run();
topChangeFut.get();
}
finally {
stopFlag.set(true);
}
}
}