blob: b5ee0015b883c72077411080625db7c6242f243d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jsr166.ConcurrentLinkedHashMap;
import org.junit.Assert;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}.
*
* This test can be extended to emulate failover scenarios during transactional operations on the grid.
*/
public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
/** Count of accounts in one thread. */
private static final int DFLT_ACCOUNTS_CNT = 32;
/** Count of threads and caches. */
private static final int DFLT_TX_THREADS_CNT = Runtime.getRuntime().availableProcessors();
/** Count of nodes to start. */
private static final int DFLT_NODES_CNT = 3;
/** Count of transaction on cache. */
private static final int DFLT_TRANSACTIONS_CNT = 10;
/** Completed transactions map. */
private ConcurrentLinkedHashMap[] completedTxs;
/**
*
*/
protected int nodesCount() {
return DFLT_NODES_CNT;
}
/**
*
*/
protected int accountsCount() {
return DFLT_ACCOUNTS_CNT;
}
/**
*
*/
protected int transactionsCount() {
return DFLT_TRANSACTIONS_CNT;
}
/**
*
*/
protected int txThreadsCount() {
return DFLT_TX_THREADS_CNT;
}
/**
* @return Flag enables secondary index on account caches.
*/
protected boolean indexed() {
return false;
}
/**
* @return Flag enables persistence on account caches.
*/
protected boolean persistent() {
return true;
}
/** {@inheritDoc} */
@Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
return new StopNodeFailureHandler();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
final IgniteConfiguration cfg = super.getConfiguration(name);
cfg.setConsistentId(name);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistent())
.setMaxSize(50 * 1024 * 1024)
)
.setWalSegmentSize(16 * 1024 * 1024)
.setPageSize(1024)
.setWalMode(WALMode.LOG_ONLY));
CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()];
for (int i = 0; i < txThreadsCount(); i++) {
CacheConfiguration ccfg = new CacheConfiguration()
.setName(cacheName(i))
.setAffinity(new RendezvousAffinityFunction(false, accountsCount()))
.setBackups(1)
.setAtomicityMode(TRANSACTIONAL)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(FULL_SYNC)
.setReadFromBackup(true)
.setOnheapCacheEnabled(true);
if (indexed())
ccfg.setIndexedTypes(IgniteUuid.class, AccountState.class);
cacheConfigurations[i] = ccfg;
}
cfg.setCacheConfiguration(cacheConfigurations);
cfg.setFailureDetectionTimeout(30_000);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
}
/**
* Make test cache name by prefix.
*/
@NotNull private String cacheName(int cachePrefixIdx) {
return "cache" + cachePrefixIdx;
}
/**
* Test transfer amount.
*
* @param failoverScenario Scenario.
* @param colocatedAccounts {@code True} to use colocated on same primary node accounts.
*/
public void doTestTransferAmount(FailoverScenario failoverScenario, boolean colocatedAccounts) throws Exception {
failoverScenario.beforeNodesStarted();
//given: started some nodes with client.
startGrids(nodesCount());
IgniteEx igniteClient = startClientGrid(getConfiguration(getTestIgniteInstanceName(nodesCount())));
igniteClient.cluster().active(true);
int[] initAmounts = new int[txThreadsCount()];
completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()];
//and: fill all accounts on all caches and calculate total amount for every cache.
for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) {
IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
AtomicInteger coinsCntr = new AtomicInteger();
try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
for (int accountId = 0; accountId < accountsCount(); accountId++) {
Set<Integer> initAmount = generateCoins(coinsCntr, 5);
cache.put(accountId, new AccountState(accountId, tx.xid(), initAmount));
}
tx.commit();
}
initAmounts[cachePrefixIdx] = coinsCntr.get();
completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap();
}
//when: start transfer amount from account to account in different threads.
CountDownLatch firstTransactionDone = new CountDownLatch(txThreadsCount());
ArrayList<Thread> transferThreads = new ArrayList<>();
for (int i = 0; i < txThreadsCount(); i++) {
transferThreads.add(new TransferAmountTxThread(firstTransactionDone,
igniteClient, cacheName(i), i, colocatedAccounts));
transferThreads.get(i).start();
}
firstTransactionDone.await(10, TimeUnit.SECONDS);
failoverScenario.afterFirstTransaction();
for (Thread thread : transferThreads)
thread.join();
failoverScenario.afterTransactionsFinished();
consistencyCheck(initAmounts);
}
/**
* Calculates total amount of coins for every thread for every node and checks that coins difference is zero
* (transaction integrity is saved).
*/
private void consistencyCheck(int[] initAmount) {
for (Ignite node : G.allGrids()) {
for (int j = 0; j < txThreadsCount(); j++) {
List<Integer> totalCoins = new ArrayList<>();
String cacheName = cacheName(j);
IgniteCache<Integer, AccountState> cache = node.getOrCreateCache(cacheName);
AccountState[] accStates = new AccountState[accountsCount()];
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
for (int i = 0; i < accountsCount(); i++) {
AccountState state = cache.get(i);
Assert.assertNotNull(
"Account state has lost [node=" + node.name() + ", cache=" + cacheName + ", accNo=" + i + "]",
state
);
totalCoins.addAll(state.coins);
accStates[i] = state;
}
tx.commit();
}
Collections.sort(totalCoins);
if (initAmount[j] != totalCoins.size()) {
Set<Integer> lostCoins = new HashSet<>();
Set<Integer> duplicateCoins = new HashSet<>();
for (int coin = 1; coin <= initAmount[j]; coin++)
if (!totalCoins.contains(coin))
lostCoins.add(coin);
for (int coinIdx = 1; coinIdx < totalCoins.size(); coinIdx++)
if (totalCoins.get(coinIdx).equals(totalCoins.get(coinIdx - 1)))
duplicateCoins.add(totalCoins.get(coinIdx));
log.error("Transaction integrity failed for [node=" + node.name() + ", cache=" + cacheName + "]");
log.error(
String.format("Total amount of coins before and after transfers are not same. Lost coins: %s. Duplicate coins: %s.",
Objects.toString(lostCoins),
Objects.toString(duplicateCoins)));
ConcurrentLinkedHashMap<IgniteUuid, TxState> txs = completedTxs[j];
for (TxState tx : txs.values())
log.error("Tx: " + tx);
for (int i = 0; i < accountsCount(); i++)
log.error("Account state " + i + " = " + accStates[i]);
assertFalse("Test failed. See messages above", true);
}
}
}
}
/**
*
*/
public static class AccountState {
/** Account id. */
private final int accId;
/** Last performed transaction id on account state. */
@QuerySqlField(index = true)
private final IgniteUuid txId;
/** Set of coins holds in account. */
private final Set<Integer> coins;
/**
* @param accId Acc id.
* @param txId Tx id.
* @param coins Coins.
*/
public AccountState(int accId, IgniteUuid txId, Set<Integer> coins) {
this.txId = txId;
this.coins = Collections.unmodifiableSet(coins);
this.accId = accId;
}
/**
* @param random Randomizer.
* @return Set of coins need to transfer from.
*/
public Set<Integer> coinsToTransfer(Random random) {
int coinsNum = random.nextInt(coins.size());
return coins.stream().limit(coinsNum).collect(Collectors.toSet());
}
/**
* @param txId Transaction id.
* @param coinsToAdd Coins to add to current account.
* @return Account state with added coins.
*/
public AccountState addCoins(IgniteUuid txId, Set<Integer> coinsToAdd) {
return new AccountState(accId, txId, Sets.union(coins, coinsToAdd).immutableCopy());
}
/**
* @param txId Transaction id.
* @param coinsToRmv Coins to remove from current account.
* @return Account state with removed coins.
*/
public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRmv) {
return new AccountState(accId, txId, Sets.difference(coins, coinsToRmv).immutableCopy());
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccountState that = (AccountState)o;
return Objects.equals(txId, that.txId) &&
Objects.equals(coins, that.coins);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(txId, coins);
}
/** {@inheritDoc} */
@Override public String toString() {
return "AccountState{" +
"accId=" + Objects.toString(accId) +
", coins=" + Objects.toString(coins) +
'}';
}
}
/**
* @param coinsNum Coins number.
*/
private Set<Integer> generateCoins(AtomicInteger coinsCntr, int coinsNum) {
Set<Integer> res = new HashSet<>();
for (int i = 0; i < coinsNum; i++)
res.add(coinsCntr.incrementAndGet());
return res;
}
/**
* State representing transaction between two accounts.
*/
static class TxState {
/**
* Account states before transaction.
*/
AccountState before1;
/**
* Account states before transaction.
*/
AccountState before2;
/**
* Account states after transaction.
*/
AccountState after1;
/**
* Account states after transaction.
*/
AccountState after2;
/**
* Transferred coins between accounts during this transaction.
*/
Set<Integer> transferredCoins;
/**
* @param before1 Before 1.
* @param before2 Before 2.
* @param after1 After 1.
* @param after2 After 2.
* @param transferredCoins Transferred coins.
*/
public TxState(
AccountState before1,
AccountState before2,
AccountState after1,
AccountState after2,
Set<Integer> transferredCoins
) {
this.before1 = before1;
this.before2 = before2;
this.after1 = after1;
this.after2 = after2;
this.transferredCoins = transferredCoins;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TxState{" +
"before1=" + before1 +
", before2=" + before2 +
", transferredCoins=" + transferredCoins +
", after1=" + after1 +
", after2=" + after2 +
'}';
}
}
/**
*
*/
private class TransferAmountTxThread extends Thread {
/** */
private CountDownLatch firstTransactionLatch;
/** */
private IgniteEx ignite;
/** */
private String cacheName;
/** */
private int workerIdx;
/** */
private Random random = new Random();
/** */
private final boolean colocatedAccounts;
/**
* @param ignite Ignite.
*/
private TransferAmountTxThread(CountDownLatch firstTransactionLatch,
final IgniteEx ignite,
String cacheName,
int workerIdx,
boolean colocatedAccounts) {
super("TransferAmountTxThread");
this.firstTransactionLatch = firstTransactionLatch;
this.ignite = ignite;
this.cacheName = cacheName;
this.workerIdx = workerIdx;
this.colocatedAccounts = colocatedAccounts;
}
/** {@inheritDoc} */
@Override public void run() {
for (int i = 0; i < transactionsCount(); i++) {
try {
updateInTransaction(ignite.cache(cacheName));
}
finally {
if (i == 0)
firstTransactionLatch.countDown();
}
}
}
/**
* @throws IgniteException if fails
*/
private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException {
int accIdFrom;
int accIdTo;
for (;;) {
accIdFrom = random.nextInt(accountsCount());
accIdTo = random.nextInt(accountsCount());
if (accIdFrom == accIdTo)
continue;
Affinity<Object> affinity = ignite.affinity(cacheName);
ClusterNode primaryForAccFrom = affinity.mapKeyToNode(accIdFrom);
assertNotNull(primaryForAccFrom);
ClusterNode primaryForAccTo = affinity.mapKeyToNode(accIdTo);
assertNotNull(primaryForAccTo);
// Allows only transaction between accounts that primary on the same node if corresponding flag is enabled.
if (colocatedAccounts && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
continue;
break;
}
AccountState acctFrom;
AccountState acctTo;
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
acctFrom = cache.get(accIdFrom);
assertNotNull(acctFrom);
acctTo = cache.get(accIdTo);
assertNotNull(acctTo);
Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random);
AccountState nextFrom = acctFrom.removeCoins(tx.xid(), coinsToTransfer);
AccountState nextTo = acctTo.addCoins(tx.xid(), coinsToTransfer);
cache.put(accIdFrom, nextFrom);
cache.put(accIdTo, nextTo);
tx.commit();
completedTxs[workerIdx].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
}
}
}
/**
* Interface to implement custom failover scenario during transactional amount transfer.
*/
public interface FailoverScenario {
/**
* Callback before nodes have started.
*/
public default void beforeNodesStarted() throws Exception { }
/**
* Callback when first transaction has finished.
*/
public default void afterFirstTransaction() throws Exception { }
/**
* Callback when all transactions have finished.
*/
public default void afterTransactionsFinished() throws Exception { }
}
}