blob: 7bcd7f7a6ce36e1cb0366785aa2517574cf6490b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
/**
* Test to check passing transaction's label for EVT_CACHE_OBJECT_READ, EVT_CACHE_OBJECT_PUT,
* EVT_CACHE_OBJECT_REMOVED events.
*/
public class CacheEventWithTxLabelTest extends GridCommonAbstractTest {
/** Types event to be checked. */
private static final int[] CACHE_EVENT_TYPES = {EVT_CACHE_OBJECT_READ, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED};
/** Transaction label. */
private static final String TX_LABEL = "TX_LABEL";
/** Number of server nodes. */
private static final int SRVS = 3;
/** Number of client nodes. */
private static final int CLIENTS = 1;
/** Cache name. */
public static final String CACHE_NAME = "cache";
/** Key related to primary node. */
private Integer primaryKey = 0;
/** Key related to backup node. */
private Integer backupKey = 0;
/** Current cash backup count. */
private int backupCnt;
/** Current transaction isolation level. */
private TransactionIsolation isolation;
/** Current transaction concurrency level. */
private TransactionConcurrency concurrency;
/** All failed tests information, */
private ArrayList<String> errors = new ArrayList<>();
/** Count of errors on previous iteration of testing. */
private int prevErrCnt = 0;
/** List to keep all events with no tx label between run tests */
private static List<CacheEvent> wrongEvts = Collections.synchronizedList(new ArrayList<>());
/** Simple entry processor to use for tests */
private static CacheEntryProcessor entryProcessor = (CacheEntryProcessor)(entry, objects) -> entry.getValue();
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(SRVS);
startClientGridsMultiThreaded(SRVS, CLIENTS);
waitForDiscovery(primary(), backup1(), backup2(), client());
registerEventListeners(primary(), backup1(), backup2(), client());
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-10270", MvccFeatureChecker.forcedMvcc());
}
/**
* Check all cases for passing transaction label in cash event.
*
* @throws Exception If failed.
*/
@Test
public void testPassTxLabelInCashEventForAllCases() throws Exception {
Ignite[] nodes = {client(), primary(), backup1(), backup2()};
for (int backupCnt = 0; backupCnt < SRVS; backupCnt++) {
this.backupCnt = backupCnt;
prepareCache(backupCnt);
for (TransactionIsolation isolation : TransactionIsolation.values()) {
this.isolation = isolation;
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
this.concurrency = concurrency;
if (MvccFeatureChecker.forcedMvcc() && !MvccFeatureChecker.isSupported(concurrency, isolation))
continue;
for (int i = 0; i < nodes.length - 1; i++) {
Ignite nodeForPut = nodes[i];
Ignite nodeForGet = nodes[i + 1];
singleWriteReadRemoveTest(nodeForPut, nodeForGet);
multiWriteReadRemoveTest(nodeForPut, nodeForGet);
singleNodeBatchWriteReadRemoveTest(nodeForPut, nodeForGet);
multiNodeBatchWriteReadRemoveTest(nodeForPut, nodeForGet);
writeInvokeRemoveTest(nodeForPut, nodeForGet);
writeInvokeAllRemoveTest(nodeForPut, nodeForGet);
}
}
}
}
String listOfFailedTests = String.join(",\n", errors);
Assert.assertTrue("Have been received " + prevErrCnt + " cache events with incorrect txlabel.\n" +
"Failed tests:" + listOfFailedTests,
errors.isEmpty());
}
/**
* Check error after run test. In case error occured information about failed test will be added to errors list.
*
* @param testName Name of test which result will be checked.
* @param node1 First node
* @param node2 Second node
*/
private void checkResult(String testName, Ignite node1, Ignite node2) {
int currErrCnt = wrongEvts.size();
if (prevErrCnt != currErrCnt) {
prevErrCnt = currErrCnt;
errors.add(String.format("%s backCnt-%s, %s, %s, node1-%s, node2-%s",
testName, backupCnt, isolation, concurrency, nodeType(node1), nodeType(node2)));
}
}
/**
* @param node Ignite node
* @return Node type in the test
*/
private String nodeType(Ignite node) {
if (client().equals(node))
return "CLIENT";
else if (primary().equals(node))
return "PRIMARY";
else if (backup1().equals(node))
return "BACKUP1";
else if (backup2().equals(node))
return "BACKUP2";
else
return "UNKNOWN";
}
/**
* Test single put, get, remove operations.
*
* @param instanceToPut Ignite instance to put test data.
* @param instanceToGet Ignite instance to get test data.
*/
private void singleWriteReadRemoveTest(Ignite instanceToPut, Ignite instanceToGet) {
runTransactionally(instanceToPut, (Ignite ign) -> {
ign.cache(CACHE_NAME).put(primaryKey, 3);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).get(primaryKey);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).remove(primaryKey);
});
checkResult("singleWriteReadRemoveTest", instanceToPut, instanceToGet);
}
/**
* Test multi put, get, remove operations
*
* @param instanceToPut Ignite instance to put test data.
* @param instanceToGet Ignite instance to get test data.
*/
private void multiWriteReadRemoveTest(Ignite instanceToPut, Ignite instanceToGet) {
runTransactionally(instanceToPut, (Ignite ign) -> {
ign.cache(CACHE_NAME).put(primaryKey, 2);
ign.cache(CACHE_NAME).put(backupKey, 3);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).get(primaryKey);
ign.cache(CACHE_NAME).get(backupKey);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).remove(primaryKey);
ign.cache(CACHE_NAME).remove(backupKey);
});
checkResult("multiWriteReadRemoveTest", instanceToPut, instanceToGet);
}
/**
* Test multi nodes batch write-read
*
* @param instanceToPut Ignite instance to put test data.
* @param instanceToGet Ignite instance to get test data.
*/
private void multiNodeBatchWriteReadRemoveTest(Ignite instanceToPut, Ignite instanceToGet) {
Map<Integer, Integer> keyValuesMap = IntStream.range(0, 100).boxed()
.collect(Collectors.toMap(Function.identity(), Function.identity()));
runTransactionally(instanceToPut, (Ignite ign) -> {
ign.cache(CACHE_NAME).putAll(keyValuesMap);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).getAll(keyValuesMap.keySet());
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).removeAll(keyValuesMap.keySet());
});
checkResult("multiNodeBatchWriteReadRemoveTest", instanceToPut, instanceToGet);
}
/**
* Test single node batch write-read-remove
*
* @param instanceToPut Ignite instance to put test data.
* @param instanceToGet Ignite instance to get test data.
*/
private void singleNodeBatchWriteReadRemoveTest(Ignite instanceToPut, Ignite instanceToGet) {
IgnitePair<Integer> keys = evaluatePrimaryAndBackupKeys(primaryKey + 1, backupKey + 1);
Map<Integer, Integer> keyValuesMap = new HashMap<>();
keyValuesMap.put(primaryKey, 1);
keyValuesMap.put(keys.get1(), 2);
runTransactionally(instanceToPut, (Ignite ign) -> {
ign.cache(CACHE_NAME).putAll(keyValuesMap);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).getAll(keyValuesMap.keySet());
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).removeAll(keyValuesMap.keySet());
});
checkResult("oneNodeBatchWriteReadRemoveTest", instanceToPut, instanceToGet);
}
/**
* Test put-invoke-remove
*
* @param instanceToPut Ignite instance to put test data.
* @param instanceToGet Ignite instance to get test data.
*/
@SuppressWarnings("unchecked")
private void writeInvokeRemoveTest(Ignite instanceToPut, Ignite instanceToGet) {
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).put(primaryKey, 3);
});
runTransactionally(instanceToPut, (Ignite ign) -> {
ign.cache(CACHE_NAME).invoke(primaryKey, entryProcessor);
ign.cache(CACHE_NAME).invoke(backupKey, entryProcessor);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).remove(primaryKey);
});
checkResult("writeInvokeRemoveTest", instanceToPut, instanceToGet);
}
/**
* Test putAll-invokeAll-removeAll
*
* @param instanceToPut Ignite instance to put test data.
* @param instanceToGet Ignite instance to get test data.
*/
@SuppressWarnings("unchecked")
private void writeInvokeAllRemoveTest(Ignite instanceToPut, Ignite instanceToGet) {
Map<Integer, Integer> keyValuesMap = IntStream.range(0, 100).boxed()
.collect(Collectors.toMap(Function.identity(), Function.identity()));
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).putAll(keyValuesMap);
});
runTransactionally(instanceToPut, (Ignite ign) -> {
ign.cache(CACHE_NAME).invokeAll(keyValuesMap.keySet(), entryProcessor);
});
runTransactionally(instanceToGet, (Ignite ign) -> {
ign.cache(CACHE_NAME).removeAll(keyValuesMap.keySet());
});
checkResult("WriteInvokeAllRemoveTest", instanceToPut, instanceToGet);
}
/**
* Run command in transaction.
*
* @param startNode Ignite node to start transaction and run passed command.
* @param cmdInTx Command which should be done in transaction.
*/
private void runTransactionally(Ignite startNode, Consumer<Ignite> cmdInTx) {
try (Transaction tx = startNode.transactions().withLabel(TX_LABEL).txStart(concurrency, isolation)) {
cmdInTx.accept(startNode);
tx.commit();
}
}
/**
* Add event listener to passed Ignite instances for cache event types.
*
* @param igns Ignite instances.
*/
private void registerEventListeners(Ignite... igns) {
if (igns != null) {
for (Ignite ign : igns) {
ign.events().enableLocal(CACHE_EVENT_TYPES);
ign.events().localListen((IgnitePredicate<Event>)event -> {
CacheEvent cacheEvt = (CacheEvent)event;
if (!TX_LABEL.equals(cacheEvt.txLabel())) {
log.error("Has been received event with incorrect label " + cacheEvt.txLabel() + " ," +
" expected " + TX_LABEL + " label");
wrongEvts.add(cacheEvt);
}
return true;
}, CACHE_EVENT_TYPES);
}
}
}
/**
* Create cache with passed number of backups and determinate primary and backup keys. If cache was created before
* it will be removed before create new one.
*
* @param cacheBackups Number of backups for cache.
* @throws InterruptedException In case of fail.
*/
private void prepareCache(int cacheBackups) throws InterruptedException {
IgniteCache<Object, Object> cache = client().cache(CACHE_NAME);
if (cache != null)
cache.destroy();
client().createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(cacheBackups)
);
awaitPartitionMapExchange();
IgnitePair<Integer> keys = evaluatePrimaryAndBackupKeys(0, 0);
primaryKey = keys.get1();
backupKey = keys.get2();
}
/**
* Evaluate primary and backup keys.
*
* @param primaryKeyStart Value from need to start calculate primary key.
* @param backupKeyStart Value from need to start calculate backup key.
* @return Pair of result. The first result is found primary key. The second is found backup key.
*/
private IgnitePair<Integer> evaluatePrimaryAndBackupKeys(final int primaryKeyStart, final int backupKeyStart) {
int primaryKey = primaryKeyStart;
int backupKey = backupKeyStart;
while (!client().affinity(CACHE_NAME).isPrimary(((IgniteKernal)primary()).localNode(), primaryKey))
primaryKey++;
while (!client().affinity(CACHE_NAME).isBackup(((IgniteKernal)primary()).localNode(), backupKey)
&& backupKey < 100 + backupKeyStart)
backupKey++;
return new IgnitePair<>(primaryKey, backupKey);
}
/**
* Return primary node.
*
* @return Primary node.
*/
private Ignite primary() {
return ignite(0);
}
/**
* Return first backup node.
*
* @return First backup node.
*/
private Ignite backup1() {
return ignite(1);
}
/**
* Return second backup node.
*
* @return Second backup node.
*/
private Ignite backup2() {
return ignite(2);
}
/**
* Return client node.
*
* @return Client node.
*/
private Ignite client() {
return ignite(3);
}
}