blob: 2dc34ae60a7d1c8935ef8a622c97b40bd534cf12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.local;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static java.util.Objects.nonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.IntStream.range;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.testframework.LogListener.matches;
/**
* Class for testing fast node left during transaction for cache.
*/
public class GridCacheFastNodeLeftForTransactionTest extends GridCommonAbstractTest {
/** Number of nodes. */
private static final int NODES = 4;
/** Number of transactions. */
private static final int TX_COUNT = 20;
/** Logger for listen log messages. */
private static ListeningTestLogger listeningLog;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
listeningLog = new ListeningTestLogger(GridAbstractTest.log);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
/*To listen the logs of future in current tests, since the log in the
futures is static and is not reset when tests are launched.*/
setFieldValue(GridDhtTxFinishFuture.class, "log", null);
((AtomicReference<IgniteLogger>)getFieldValue(GridDhtTxFinishFuture.class, "logRef")).set(null);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
listeningLog.clearListeners();
super.afterTest();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setMvccVacuumFrequency(1000)
.setCacheConfiguration(createCacheConfigs())
.setGridLogger(listeningLog)
.setConnectorConfiguration(new ConnectorConfiguration());
}
/**
* Test transaction rollback when one of the nodes drops out.
*
* @throws Exception If failed.
*/
@Test
public void testRollbackTransactions() throws Exception {
int txCnt = TX_COUNT;
int nodes = NODES;
IgniteEx crd = createCluster(nodes);
for (CacheConfiguration cacheConfig : createCacheConfigs()) {
String cacheName = cacheConfig.getName();
IgniteCache<Object, Object> cache = crd.cache(cacheName);
List<Integer> keys = primaryKeys(cache, txCnt);
Map<Integer, Integer> cacheValues = range(0, txCnt / 2).boxed().collect(toMap(keys::get, identity()));
cache.putAll(cacheValues);
Collection<Transaction> txs = createTxs(
grid(nodes),
cacheName,
range(txCnt / 2, txCnt).mapToObj(keys::get).collect(toList())
);
int stoppedNodeId = 2;
stopGrid(stoppedNodeId);
LogListener logLsnr = newLogListener();
listeningLog.registerListener(logLsnr);
for (Transaction tx : txs)
tx.rollback();
awaitPartitionMapExchange();
check(cacheValues, cacheName, logLsnr, stoppedNodeId);
}
}
/**
* Test for rollback transactions when one of the nodes drops out,
* with operations performed on keys outside the transaction.
*
* @throws Exception If failed.
*/
@Test
public void testRollbackTransactionsWithKeyOperationOutsideThem() throws Exception {
int txCnt = TX_COUNT;
int nodes = NODES;
IgniteEx crd = createCluster(nodes);
for (CacheConfiguration cacheConfig : createCacheConfigs()) {
String cacheName = cacheConfig.getName();
IgniteCache<Object, Object> cache = crd.cache(cacheName);
List<Integer> keys = primaryKeys(cache, txCnt);
Map<Integer, Integer> cacheValues = range(0, txCnt / 2).boxed().collect(toMap(keys::get, identity()));
cache.putAll(cacheValues);
List<Integer> txKeys = range(txCnt / 2, txCnt).mapToObj(keys::get).collect(toList());
IgniteEx clientNode = grid(nodes);
Collection<Transaction> txs = createTxs(clientNode, cacheName, txKeys);
int stoppedNodeId = 2;
stopGrid(grid(stoppedNodeId).name(), false, false);
GridTestUtils.runAsync(() -> {
IgniteCache<Object, Object> clientCache = clientNode.cache(cacheName);
txKeys.forEach(clientCache::get);
});
LogListener logLsnr = newLogListener();
listeningLog.registerListener(logLsnr);
for (Transaction tx : txs)
tx.rollback();
awaitPartitionMapExchange();
check(cacheValues, cacheName, logLsnr, stoppedNodeId);
}
}
/**
* Checking the contents of the cache after rollback transactions,
* with restarting the stopped node with using "idle_verify".
*
* @param cacheValues Expected cache contents.
* @param cacheName Cache name.
* @param logLsnr LogListener.
* @param stoppedNodeId ID of the stopped node.
* @throws Exception If failed.
*/
private void check(
Map<Integer, Integer> cacheValues,
String cacheName,
LogListener logLsnr,
int stoppedNodeId
) throws Exception {
assert nonNull(cacheValues);
assert nonNull(cacheName);
assert nonNull(logLsnr);
checkCacheData(cacheValues, cacheName);
assertTrue(logLsnr.check());
IgniteEx stoppedNode = startGrid(stoppedNodeId);
awaitPartitionMapExchange();
// Wait for vacuum.
doSleep(2000);
checkCacheData(cacheValues, cacheName);
IdleVerifyResultV2 idleVerifyResV2 = idleVerify(stoppedNode, null);
SB sb = new SB();
idleVerifyResV2.print(sb::a, true);
assertContains(listeningLog, sb.toString(), "no conflicts have been found");
}
/**
* Creating a cluster.
*
* @param nodes Number of server nodes, plus one client.
* @throws Exception If failed.
*/
private IgniteEx createCluster(int nodes) throws Exception {
IgniteEx crd = startGrids(nodes);
startClientGrid(nodes);
awaitPartitionMapExchange();
return crd;
}
/**
* Transaction creation.
*
* @param node Node.
* @param cacheName Cache name.
* @param keys Keys.
* @return Transactions.
* @throws Exception If failed.
*/
private Collection<Transaction> createTxs(
IgniteEx node,
String cacheName,
Collection<Integer> keys
) throws Exception {
assert nonNull(node);
assert nonNull(cacheName);
assert nonNull(keys);
IgniteCache<Object, Object> cache = node.cache(cacheName);
Collection<Transaction> txs = new ArrayList<>();
Transaction tx = node.transactions().txStart();
for (Integer key : keys) {
cache.put(key, key + 10);
txs.add(tx);
}
((TransactionProxyImpl)tx).tx().prepare(true);
return txs;
}
/**
* Creating an instance of LogListener to find an exception
* "Unable to send message (node left topology):".
*
* @return LogListener.
*/
private LogListener newLogListener() {
return matches("Unable to send message (node left topology):").build();
}
/**
* Creating a cache configurations.
*
* @return Cache configurations.
*/
private CacheConfiguration[] createCacheConfigs() {
return new CacheConfiguration[] {
createCacheConfig(DEFAULT_CACHE_NAME + "_0", FULL_SYNC),
createCacheConfig(DEFAULT_CACHE_NAME + "_1", PRIMARY_SYNC)
};
}
/**
* Creating a cache configuration.
*
* @param cacheName Cache name.
* @param syncMode Sync mode.
* @return Cache configuration.
*/
private CacheConfiguration createCacheConfig(String cacheName, CacheWriteSynchronizationMode syncMode) {
assert nonNull(cacheName);
assert nonNull(syncMode);
return new CacheConfiguration(cacheName)
.setAtomicityMode(TRANSACTIONAL)
.setBackups(2)
.setAffinity(new RendezvousAffinityFunction(false, 10))
.setWriteSynchronizationMode(syncMode);
}
}