blob: d408333bc915273fd8b7fd81057788fdb3b2ef24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Maps;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.internal.util.collections.Sets;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* Tests for client nodes with slow discovery.
*/
@RunWith(Parameterized.class)
public class ClientSlowDiscoveryTransactionRemapTest extends ClientSlowDiscoveryAbstractTest {
/** */
@Parameterized.Parameters(name = "isolation = {0}, concurrency = {1}, operation = {2}")
public static List<Object[]> parameters() {
ArrayList<Object[]> params = new ArrayList<>();
List<IgniteInClosure<TestTransaction<Integer, Integer>>> operations = new ArrayList<>();
operations.add(new NamedClosure<>(putRemoveSameKey, "putRemoveSameKey"));
operations.add(new NamedClosure<>(putRemoveDifferentKey, "putRemoveDifferentKey"));
operations.add(new NamedClosure<>(getPutSameKey, "getPutSameKey"));
operations.add(new NamedClosure<>(getPutDifferentKey, "getPutDifferentKey"));
operations.add(new NamedClosure<>(putAllRemoveAllSameKeys, "putAllRemoveAllSameKeys"));
operations.add(new NamedClosure<>(putAllRemoveAllDifferentKeys, "putAllRemoveAllDifferentKeys"));
operations.add(new NamedClosure<>(randomOperation, "random"));
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
if (!shouldBeTested(concurrency, isolation))
continue;
for (IgniteInClosure<TestTransaction<Integer, Integer>> operation : operations)
params.add(new Object[] {concurrency, isolation, operation});
}
}
return params;
}
/**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @return {@code True} if pair concurrency - isolation should be tested.
*/
private static boolean shouldBeTested(TransactionConcurrency concurrency, TransactionIsolation isolation) {
if (concurrency == PESSIMISTIC)
return isolation == REPEATABLE_READ || isolation == READ_COMMITTED;
return concurrency == OPTIMISTIC && isolation == SERIALIZABLE;
}
/** Keys set. */
private static final int KEYS_SET = 64;
/** Put remove same key. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> putRemoveSameKey = tx -> {
tx.put(1, 1);
tx.remove(1);
tx.put(1, 100);
};
/** Put remove different key. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> putRemoveDifferentKey = tx -> {
tx.put(1, 1);
tx.remove(2);
};
/** Get put same key. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> getPutSameKey = tx -> {
int val = tx.get(1);
tx.put(1, val + 1);
};
/** Get put different key. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> getPutDifferentKey = tx -> {
int val = tx.get(1);
tx.put(2, val + 1);
};
/** Put all remove all same keys. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> putAllRemoveAllSameKeys = tx -> {
tx.putAll(Maps.asMap(Sets.newSet(1, 2, 3, 4, 5), k -> k));
tx.removeAll(Sets.newSet(1, 2, 3, 4, 5));
};
/** Put all remove all different keys. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> putAllRemoveAllDifferentKeys = tx -> {
tx.putAll(Maps.asMap(Sets.newSet(1, 2, 3, 4, 5), k -> k));
tx.removeAll(Sets.newSet(6, 7, 8, 9, 10));
};
/** Random operation. */
private static IgniteInClosure<TestTransaction<Integer, Integer>> randomOperation = tx -> {
long seed = ThreadLocalRandom.current().nextLong();
log.info("Seed: " + seed);
Random random = new Random(seed);
for (int it = 0; it < 10; it++) {
int operation = random.nextInt(TestTransaction.POSSIBLE_OPERATIONS);
switch (operation) {
// Get:
case 0: {
int key = random.nextInt(KEYS_SET);
tx.get(key);
break;
}
// Put:
case 1: {
int key = random.nextInt(KEYS_SET);
int val = random.nextInt(KEYS_SET);
tx.put(key, val);
break;
}
// Remove:
case 2: {
int key = random.nextInt(KEYS_SET);
tx.remove(key);
break;
}
// Put All:
case 3: {
tx.putAll(
random.ints(5, 0, KEYS_SET)
.boxed()
.distinct()
.collect(toMap(
k -> k, k -> k)
)
);
break;
}
// Remove All:
case 4: {
tx.removeAll(
random.ints(5, 0, KEYS_SET).boxed().collect(toSet())
);
break;
}
}
}
};
/**
* Interface to work with cache operations within transaction.
*/
private static interface TestTransaction<K, V> {
/** Possible operations. */
static int POSSIBLE_OPERATIONS = 5;
/**
* @param key Key.
* @return Value.
*/
V get(K key);
/**
* @param key Key.
* @param val Value.
*/
void put(K key, V val);
/**
* @param key Key.
*/
void remove(K key);
/**
* @param map Map.
*/
void putAll(Map<K, V> map);
/**
* @param keys Keys.
*/
void removeAll(Set<K> keys);
}
/**
* Closure with possibility to set name to have proper print in test parameters.
*/
private static class NamedClosure<K, V> implements IgniteInClosure<TestTransaction<K, V>> {
/** Closure. */
private final IgniteInClosure<TestTransaction<K, V>> c;
/** Name. */
private final String name;
/**
* @param c Closure.
* @param name Name.
*/
public NamedClosure(IgniteInClosure<TestTransaction<K, V>> c, String name) {
this.c = c;
this.name = name;
}
/** {@inheritDoc} */
@Override public void apply(TestTransaction<K, V> kvTestTransaction) {
c.apply(kvTestTransaction);
}
/** {@inheritDoc} */
@Override public String toString() {
return name;
}
}
/**
* Implementation for transaction operations backed by Ignite cache.
*/
private static class TestTransactionEngine<K, V> implements TestTransaction<K, V> {
/** Removed. */
private final Object RMV = new Object();
/** Cache. */
private final IgniteCache<K, V> cache;
/** Map to consistency check. */
private final Map<K, Object> map;
/**
* @param cache Cache.
*/
TestTransactionEngine(IgniteCache<K, V> cache) {
this.cache = cache;
this.map = new HashMap<>();
}
/** {@inheritDoc} */
@Override public V get(K key) {
return cache.get(key);
}
/** {@inheritDoc} */
@Override public void put(K key, V val) {
map.put(key, val);
cache.put(key, val);
}
/** {@inheritDoc} */
@Override public void remove(K key) {
map.put(key, RMV);
cache.remove(key);
}
/** {@inheritDoc} */
@Override public void putAll(Map<K, V> map) {
for (Map.Entry<K, V> entry : map.entrySet())
this.map.put(entry.getKey(), entry.getValue());
cache.putAll(map);
}
/** {@inheritDoc} */
@Override public void removeAll(Set<K> keys) {
for (K key : keys)
map.put(key, RMV);
cache.removeAll(keys);
}
/**
* Consistency check for transaction operations.
*/
public void consistencyCheck() {
for (Map.Entry<K, Object> entry : map.entrySet()) {
if (entry.getValue() == RMV)
Assert.assertNull("Value is not null for key: " + entry.getKey(), cache.get(entry.getKey()));
else
Assert.assertEquals("Values are different for key: " + entry.getKey(),
entry.getValue(),
cache.get(entry.getKey())
);
}
}
}
/** Concurrency. */
@Parameterized.Parameter(0)
public TransactionConcurrency concurrency;
/** Isolation. */
@Parameterized.Parameter(1)
public TransactionIsolation isolation;
/** Operation. */
@Parameterized.Parameter(2)
public IgniteInClosure<TestTransaction<?, ?>> operation;
/** Client disco spi block. */
private CountDownLatch clientDiscoSpiBlock;
/** Client node to perform operations. */
private IgniteEx clnt;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
stopAllGrids();
cleanPersistenceDir();
startGrid(0);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** */
@Before
public void before() throws Exception {
NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new NodeJoinInterceptingDiscoverySpi();
clientDiscoSpiBlock = new CountDownLatch(1);
// Delay node join of second client.
clientDiscoSpi.interceptor = msg -> {
if (msg.nodeId().toString().endsWith("2"))
U.awaitQuiet(clientDiscoSpiBlock);
};
discoverySpiSupplier = () -> clientDiscoSpi;
clnt = startClientGrid(1);
for (int k = 0; k < 64; k++)
clnt.cache(CACHE_NAME).put(k, 0);
discoverySpiSupplier = TcpDiscoverySpi::new;
startClientGrid(2);
}
/** */
@After
public void after() throws Exception {
// Stop client nodes.
stopGrid(1);
stopGrid(2);
}
/** */
@Test
public void testTransactionRemap() throws Exception {
TestTransactionEngine engine = new TestTransactionEngine<>(clnt.cache(CACHE_NAME));
IgniteInternalFuture<?> txFut = GridTestUtils.runAsync(() -> {
try (Transaction tx = clnt.transactions().txStart(concurrency, isolation)) {
operation.apply(engine);
tx.commit();
}
});
try {
txFut.get(1, TimeUnit.SECONDS);
}
catch (IgniteFutureTimeoutCheckedException te) {
// Expected.
}
finally {
clientDiscoSpiBlock.countDown();
}
// After resume second client join, transaction should succesfully await new affinity and commit.
txFut.get();
// Check consistency after transaction commit.
engine.consistencyCheck();
}
/** */
@Test
public void testTransactionRemapWithTimeout() throws Exception {
TestTransactionEngine engine = new TestTransactionEngine<>(clnt.cache(CACHE_NAME));
IgniteInternalFuture<?> txFut = GridTestUtils.runAsync(() -> {
try (Transaction tx = clnt.transactions().txStart(concurrency, isolation, 1_000, 1_000_000)) {
operation.apply(engine);
tx.commit();
}
});
try {
txFut.get(2, TimeUnit.SECONDS);
}
catch (IgniteFutureTimeoutCheckedException te) {
// Expected.
}
finally {
clientDiscoSpiBlock.countDown();
}
// After resume second client join, transaction should be timed out and rolled back.
if (concurrency == PESSIMISTIC) {
assertThrowsWithCause((Callable<Object>)txFut::get, TransactionTimeoutException.class);
// Check that initial data is not changed by rollbacked transaction.
for (int k = 0; k < KEYS_SET; k++)
Assert.assertEquals("Cache consistency is broken for key: " + k, 0, clnt.cache(CACHE_NAME).get(k));
}
else {
txFut.get();
engine.consistencyCheck();
}
}
}