blob: faca4daf1671de18be0f12ce25a71b708b4bb9e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.yardstick.cache.load;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
import org.apache.ignite.yardstick.IgniteNode;
import org.apache.ignite.yardstick.cache.load.model.ModelUtil;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeansException;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.yardstickframework.BenchmarkConfiguration;
import org.yardstickframework.BenchmarkUtils;
/**
* Ignite cache random operation benchmark.
*/
public class IgniteCacheRandomOperationBenchmark extends IgniteAbstractBenchmark {
/** */
public static final int operations = Operation.values().length;
/***/
public static final int CONTINUOUS_QUERY_PER_CACHE = 3;
/** Scan query predicate. */
private static BenchmarkIgniteBiPredicate igniteBiPred = new BenchmarkIgniteBiPredicate();
/** Amount partitions. */
private static final int SCAN_QUERY_PARTITION_AMOUNT = 10;
/** List off all available cache. */
private List<IgniteCache<Object, Object>> availableCaches;
/** List of available transactional cache. */
private List<IgniteCache<Object, Object>> txCaches;
/** List of affinity cache. */
private List<IgniteCache<Object, Object>> affCaches;
/** Map cache name on key classes. */
private Map<String, Class[]> keysCacheClasses;
/** Map cache name on value classes. */
private Map<String, Class[]> valuesCacheClasses;
/** List of query descriptors by cache names. */
private Map<String, List<SqlCacheDescriptor>> cacheSqlDescriptors;
/** List of SQL queries. */
private List<TestQuery> queries;
/** List of allowed cache operations which will be executed. */
private List<Operation> allowedLoadTestOps;
/**
* Replace value entry processor.
*/
private BenchmarkReplaceValueEntryProcessor replaceEntryProc;
/**
* Remove entry processor.
*/
private BenchmarkRemoveEntryProcessor rmvEntryProc;
/**
* Map of statistic information.
*/
private Map<String, AtomicLong> operationStatistics;
/** {@inheritDoc} */
@Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
super.setUp(cfg);
if (args.additionalCachesNumber() > 0)
createAdditionalCaches();
searchCache();
preLoading();
}
/**
* Creates additional caches.
*
* @throws Exception If failed.
*/
private void createAdditionalCaches() throws Exception {
Map<String, CacheConfiguration> cfgMap;
try {
// Loading spring application context and getting all of the caches configurations in the map.
cfgMap = IgniteNode.loadConfiguration(args.configuration()).get2().getBeansOfType(CacheConfiguration.class);
}
catch (BeansException e) {
throw new Exception("Failed to instantiate bean [type=" + CacheConfiguration.class + ", err=" +
e.getMessage() + ']', e);
}
if (cfgMap == null || cfgMap.isEmpty())
throw new Exception("Failed to find cache configurations in: " + args.configuration());
// Getting cache configuration from the map using name specified in property file.
CacheConfiguration<Object, Object> ccfg = cfgMap.get(args.additionalCachesName());
if (ccfg == null)
throw new Exception("Failed to find cache configuration [cache=" + args.additionalCachesName() +
", cfg=" + args.configuration() + ']');
for (int i = 0; i < args.additionalCachesNumber(); i++) {
CacheConfiguration<Object, Object> newCfg = new CacheConfiguration<>(ccfg);
newCfg.setName("additional_" + args.additionalCachesName() + "_cache_" + i);
try {
ignite().createCache(newCfg);
}
catch (CacheException e) {
BenchmarkUtils.error("Failed to create additional cache [ name = " + args.additionalCachesName() +
", err" + e.getMessage() + ']', e);
}
}
}
/** {@inheritDoc} */
@Override public void onException(Throwable e) {
BenchmarkUtils.error("The benchmark of random operation failed.", e);
super.onException(e);
}
/** {@inheritDoc} */
@Override public boolean test(Map<Object, Object> map) throws Exception {
if (nextBoolean()) {
executeInTransaction(map);
executeOutOfTx(map, true);
}
else
executeOutOfTx(map, false);
return true;
}
/** {@inheritDoc} */
@Override public void tearDown() throws Exception {
try {
BenchmarkUtils.println("Benchmark statistics");
for (String cacheName : ignite().cacheNames()) {
BenchmarkUtils.println(String.format("Operations over cache '%s'", cacheName));
for (Operation op : Operation.values()) {
BenchmarkUtils.println(cfg, String.format("%s: %s", op,
operationStatistics.get(op + "_" + cacheName)));
}
}
}
finally {
super.tearDown();
}
}
/**
* @throws Exception If failed.
*/
private void searchCache() throws Exception {
availableCaches = new ArrayList<>(ignite().cacheNames().size());
txCaches = new ArrayList<>();
affCaches = new ArrayList<>();
keysCacheClasses = new HashMap<>();
valuesCacheClasses = new HashMap<>();
replaceEntryProc = new BenchmarkReplaceValueEntryProcessor(null);
rmvEntryProc = new BenchmarkRemoveEntryProcessor();
cacheSqlDescriptors = new HashMap<>();
operationStatistics = new HashMap<>();
loadQueries();
loadAllowedOperations();
for (String cacheName : ignite().cacheNames()) {
IgniteCache<Object, Object> cache = ignite().cache(cacheName);
for (Operation op : Operation.values())
operationStatistics.put(op + "_" + cacheName, new AtomicLong(0));
CacheConfiguration configuration = cache.getConfiguration(CacheConfiguration.class);
if (isClassDefinedInConfig(configuration)) {
ArrayList<Class> keys = new ArrayList<>();
ArrayList<Class> values = new ArrayList<>();
if (configuration.getQueryEntities() != null) {
Collection<QueryEntity> entries = configuration.getQueryEntities();
for (QueryEntity queryEntity : entries) {
try {
if (queryEntity.getKeyType() != null) {
Class keyCls = Class.forName(queryEntity.getKeyType());
if (ModelUtil.canCreateInstance(keyCls))
keys.add(keyCls);
else
throw new IgniteException("Class is unknown for the load test. Make sure you " +
"specified its full name [cache=" + cacheName + ", clsName=" + queryEntity.getKeyType() + ']');
}
if (queryEntity.getValueType() != null) {
Class valCls = Class.forName(queryEntity.getValueType());
if (ModelUtil.canCreateInstance(valCls))
values.add(valCls);
else
throw new IgniteException("Class is unknown for the load test. Make sure you " +
"specified its full name [cache=" + cacheName + ", clsName=" + queryEntity.getValueType() + ']');
configureCacheSqlDescriptor(cacheName, queryEntity, valCls);
}
}
catch (ClassNotFoundException e) {
BenchmarkUtils.println(e.getMessage());
BenchmarkUtils.println("This can be a BinaryObject. Ignoring exception.");
if (!cacheSqlDescriptors.containsKey(cacheName))
cacheSqlDescriptors.put(cacheName, new ArrayList<SqlCacheDescriptor>());
}
}
}
if (configuration.getQueryEntities() != null) {
Collection<QueryEntity> entities = configuration.getQueryEntities();
for (QueryEntity entity : entities) {
try {
if (entity.getKeyType() != null) {
Class keyCls = Class.forName(entity.getKeyType());
if (ModelUtil.canCreateInstance(keyCls))
keys.add(keyCls);
else
throw new IgniteException("Class is unknown for the load test. Make sure you " +
"specified its full name [clsName=" + entity.getKeyType() + ']');
}
if (entity.getValueType() != null) {
Class valCls = Class.forName(entity.getValueType());
if (ModelUtil.canCreateInstance(valCls))
values.add(valCls);
else
throw new IgniteException("Class is unknown for the load test. Make sure you " +
"specified its full name [clsName=" + entity.getKeyType() + ']');
}
}
catch (ClassNotFoundException e) {
BenchmarkUtils.println(e.getMessage());
BenchmarkUtils.println("This can be a BinaryObject. Ignoring exception.");
if (!cacheSqlDescriptors.containsKey(cacheName))
cacheSqlDescriptors.put(cacheName, new ArrayList<SqlCacheDescriptor>());
}
}
}
keysCacheClasses.put(cacheName, keys.toArray(new Class[] {}));
valuesCacheClasses.put(cacheName, values.toArray(new Class[] {}));
}
else
keysCacheClasses.put(cacheName, new Class[] {randomKeyClass(cacheName)});
valuesCacheClasses.put(cacheName, determineValueClasses(cacheName));
affCaches.add(cache);
if (configuration.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL)
txCaches.add(cache);
availableCaches.add(cache);
}
}
/**
* Load allowed operation from parameters.
*/
private void loadAllowedOperations() {
allowedLoadTestOps = new ArrayList<>();
if (args.allowedLoadTestOps().isEmpty())
Collections.addAll(allowedLoadTestOps, Operation.values());
else {
for (String opName : args.allowedLoadTestOps())
allowedLoadTestOps.add(Operation.valueOf(opName.toUpperCase()));
}
}
/**
* Load query from file.
*
* @throws Exception If fail.
*/
private void loadQueries() throws Exception {
queries = new ArrayList<>();
if (args.loadTestQueriesFile() != null) {
try (FileReader fr = new FileReader(args.loadTestQueriesFile())) {
try (BufferedReader br = new BufferedReader(fr)) {
String line;
while ((line = br.readLine()) != null) {
if (line.trim().isEmpty())
continue;
boolean distributedJoins = false;
int commentIdx = line.lastIndexOf('#');
if (commentIdx >= 0) {
if (line.toUpperCase().indexOf("DISTRIBUTEDJOINS", commentIdx) > 0)
distributedJoins = true;
line = line.substring(0, commentIdx);
}
line = line.trim();
TestQuery qry = new TestQuery(line, distributedJoins);
queries.add(qry);
BenchmarkUtils.println("Loaded query: " + qry);
}
}
}
}
}
/**
* @param cacheName Ignite cache name.
* @param qryEntity Query entry.
* @param valCls Class of value.
* @throws ClassNotFoundException If fail.
*/
private void configureCacheSqlDescriptor(String cacheName, QueryEntity qryEntity, Class valCls)
throws ClassNotFoundException {
List<SqlCacheDescriptor> descs = cacheSqlDescriptors.get(cacheName);
if (descs == null) {
descs = new ArrayList<>();
cacheSqlDescriptors.put(cacheName, descs);
}
Map<String, Class> indexedFields = new HashMap<>();
for (QueryIndex index : qryEntity.getIndexes()) {
for (String iField : index.getFieldNames()) {
indexedFields.put(iField,
Class.forName(qryEntity.getFields().get(iField)));
}
}
descs.add(new SqlCacheDescriptor(valCls, qryEntity.getFields().keySet(),
indexedFields));
}
/**
* @param configuration Ignite cache configuration.
* @return True if defined.
*/
private boolean isClassDefinedInConfig(CacheConfiguration configuration) {
return (configuration.getIndexedTypes() != null && configuration.getIndexedTypes().length > 0)
|| !CollectionUtils.isEmpty(configuration.getQueryEntities());
}
/**
* @throws Exception If fail.
*/
private void preLoading() throws Exception {
if (args.preloadAmount() > args.range())
throw new IllegalArgumentException("Preloading amount (\"-pa\", \"--preloadAmount\") " +
"must by less then the range (\"-r\", \"--range\").");
startPreloadLogging(args.preloadLogsInterval());
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
List<Future<?>> futs = new ArrayList<>();
final Thread thread = Thread.currentThread();
for (int i = 0; i < availableCaches.size(); i++) {
final String cacheName = availableCaches.get(i).getName();
futs.add(executor.submit(new Runnable() {
@Override public void run() {
try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataStreamer(cacheName)) {
for (int i = 0; i < args.preloadAmount(); i++) {
if (i % 100 == 0 && thread.isInterrupted())
break;
dataLdr.addData(createRandomKey(i, cacheName), createRandomValue(i, cacheName));
}
}
}
}));
}
for (Future<?> fut : futs)
fut.get();
}
finally {
executor.shutdown();
}
stopPreloadLogging();
}
/**
* Building a map that contains mapping of node ID to a list of partitions stored on the node.
*
* @param cacheName Name of Ignite cache.
* @return Node to partitions map.
*/
private Map<UUID, List<Integer>> personCachePartitions(String cacheName) {
// Getting affinity for person cache.
Affinity<Object> affinity = ignite().affinity(cacheName);
// Building a list of all partitions numbers.
List<Integer> rndParts = new ArrayList<>(10);
if (affinity.partitions() <= SCAN_QUERY_PARTITION_AMOUNT)
for (int i = 0; i < affinity.partitions(); i++)
rndParts.add(i);
else {
for (int i = 0; i < SCAN_QUERY_PARTITION_AMOUNT; i++) {
int partNum;
do
partNum = nextRandom(affinity.partitions());
while (rndParts.contains(partNum));
rndParts.add(partNum);
}
}
Collections.sort(rndParts);
// Getting partition to node mapping.
Map<Integer, ClusterNode> partPerNodes = affinity.mapPartitionsToNodes(rndParts);
// Building node to partitions mapping.
Map<UUID, List<Integer>> nodesToPart = new HashMap<>();
for (Map.Entry<Integer, ClusterNode> entry : partPerNodes.entrySet()) {
List<Integer> nodeParts = nodesToPart.get(entry.getValue().id());
if (nodeParts == null) {
nodeParts = new ArrayList<>();
nodesToPart.put(entry.getValue().id(), nodeParts);
}
nodeParts.add(entry.getKey());
}
return nodesToPart;
}
/**
* @param id Object identifier.
* @param cacheName Name of Ignite cache.
* @return Random object.
*/
private Object createRandomKey(int id, String cacheName) {
Class clazz = randomKeyClass(cacheName);
return ModelUtil.create(clazz, id);
}
/**
* @param cacheName Ignite cache name.
* @return Random key class.
*/
private Class randomKeyClass(String cacheName) {
Class[] keys = keysCacheClasses.containsKey(cacheName)
? keysCacheClasses.get(cacheName) : ModelUtil.keyClasses();
return keys[nextRandom(keys.length)];
}
/**
* @param cacheName Cache name.
* @return Set classes for cache.
*/
private Class[] determineValueClasses(@NotNull String cacheName) {
return cacheName.toLowerCase().contains("fat-values") ? ModelUtil.fatValueClasses() :
ModelUtil.simpleValueClasses();
}
/**
* @param id Object identifier.
* @param cacheName Name of Ignite cache.
* @return Random object.
*/
private Object createRandomValue(int id, String cacheName) {
Class clazz = randomValueClass(cacheName);
return ModelUtil.create(clazz, id);
}
/**
* @param cacheName Ignite cache name.
* @return Random value class.
*/
private Class randomValueClass(String cacheName) {
Class[] values = valuesCacheClasses.containsKey(cacheName)
? valuesCacheClasses.get(cacheName) : ModelUtil.valueClasses();
return values[nextRandom(values.length)];
}
/**
* @param map Parameters map.
* @param withoutTransactionCache Without transaction cache.
* @throws Exception If fail.
*/
private void executeOutOfTx(Map<Object, Object> map, boolean withoutTransactionCache) throws Exception {
for (IgniteCache<Object, Object> cache : availableCaches) {
if (withoutTransactionCache && txCaches.contains(cache))
continue;
executeRandomOperation(map, cache);
}
}
/**
* @param map Parameters map.
* @param cache Ignite cache.
* @throws Exception If fail.
*/
private void executeRandomOperation(Map<Object, Object> map, IgniteCache<Object, Object> cache) throws Exception {
Operation op = nextRandomOperation();
try {
switch (op) {
case PUT:
doPut(cache);
break;
case PUT_ALL:
doPutAll(cache);
break;
case GET:
doGet(cache);
break;
case GET_ALL:
doGetAll(cache);
break;
case INVOKE:
doInvoke(cache);
break;
case INVOKE_ALL:
doInvokeAll(cache);
break;
case REMOVE:
doRemove(cache);
break;
case REMOVE_ALL:
doRemoveAll(cache);
break;
case PUT_IF_ABSENT:
doPutIfAbsent(cache);
break;
case REPLACE:
doReplace(cache);
break;
case SCAN_QUERY:
doScanQuery(cache);
break;
case SQL_QUERY:
doSqlQuery(cache);
break;
case CONTINUOUS_QUERY:
doContinuousQuery(cache, map);
}
storeStatistics(cache.getName(), map, op);
}
catch (Exception e) {
BenchmarkUtils.error(String.format("Failed to perform operation %s.", op), e);
}
}
/**
* @return Operation.
*/
@NotNull private Operation nextRandomOperation() {
return allowedLoadTestOps.get(nextRandom(allowedLoadTestOps.size()));
}
/**
* @param cacheName Ignite cache name.
* @param map Parameters map.
* @param op Operation.
*/
private void storeStatistics(String cacheName, Map<Object, Object> map, Operation op) {
String opCacheKey = op + "_" + cacheName;
Long opAmount = (Long)map.get("amount");
opAmount = opAmount == null ? 1L : opAmount + 1;
Integer opCacheCnt = (Integer)map.get(opCacheKey);
opCacheCnt = opCacheCnt == null ? 1 : opCacheCnt + 1;
if (opAmount % 100 == 0)
updateStat(map);
else
map.put(opCacheKey, opCacheCnt);
map.put("amount", opAmount);
}
/**
* @param map Parameters map.
*/
private void updateStat(Map<Object, Object> map) {
for (Operation op : Operation.values())
for (String cacheName : ignite().cacheNames()) {
String opCacheKey = op + "_" + cacheName;
Integer val = (Integer)map.get(opCacheKey);
if (val != null) {
operationStatistics.get(opCacheKey).addAndGet(val.longValue());
map.put(opCacheKey, 0);
}
}
}
/**
* Execute operations in transaction.
*
* @param map Parameters map.
* @throws Exception if fail.
*/
private void executeInTransaction(final Map<Object, Object> map) throws Exception {
IgniteBenchmarkUtils.doInTransaction(ignite().transactions(),
TransactionConcurrency.fromOrdinal(nextRandom(TransactionConcurrency.values().length)),
TransactionIsolation.fromOrdinal(nextRandom(TransactionIsolation.values().length)),
new Callable<Object>() {
@Override public Object call() throws Exception {
for (IgniteCache<Object, Object> cache : txCaches)
if (nextBoolean())
executeRandomOperation(map, cache);
return null;
}
});
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doPut(IgniteCache<Object, Object> cache) throws Exception {
int i = nextRandom(args.range());
cache.put(createRandomKey(i, cache.getName()), createRandomValue(i, cache.getName()));
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doPutAll(IgniteCache<Object, Object> cache) throws Exception {
Map<Object, Object> putMap = new TreeMap<>();
Class keyCass = randomKeyClass(cache.getName());
for (int cnt = 0; cnt < args.batch(); cnt++) {
int i = nextRandom(args.range());
putMap.put(ModelUtil.create(keyCass, i), createRandomValue(i, cache.getName()));
}
cache.putAll(putMap);
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doGet(IgniteCache<Object, Object> cache) throws Exception {
int i = nextRandom(args.range());
cache.get(createRandomKey(i, cache.getName()));
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doGetAll(IgniteCache<Object, Object> cache) throws Exception {
Set<Object> keys = new TreeSet<>();
Class keyCls = randomKeyClass(cache.getName());
for (int cnt = 0; cnt < args.batch(); cnt++) {
int i = nextRandom(args.range());
keys.add(ModelUtil.create(keyCls, i));
}
cache.getAll(keys);
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doInvoke(final IgniteCache<Object, Object> cache) throws Exception {
final int i = nextRandom(args.range());
if (nextBoolean())
cache.invoke(createRandomKey(i, cache.getName()), replaceEntryProc,
createRandomValue(i + 1, cache.getName()));
else
cache.invoke(createRandomKey(i, cache.getName()), rmvEntryProc);
}
/**
* Entry processor for local benchmark replace value task.
*/
private static class BenchmarkReplaceValueEntryProcessor implements EntryProcessor<Object, Object, Object>, Serializable {
/**
* New value for updateCache during process by default.
*/
private Object newVal;
/**
* @param newVal default new value
*/
private BenchmarkReplaceValueEntryProcessor(Object newVal) {
this.newVal = newVal;
}
/** {@inheritDoc} */
@Override public Object process(MutableEntry<Object, Object> entry, Object... arguments) {
Object newVal = arguments == null || arguments[0] == null ? this.newVal : arguments[0];
Object oldVal = entry.getValue();
entry.setValue(newVal);
return oldVal;
}
}
/**
* Entry processor for local benchmark remove entry task.
*/
private static class BenchmarkRemoveEntryProcessor implements EntryProcessor<Object, Object, Object>, Serializable {
/** {@inheritDoc} */
@Override public Object process(MutableEntry<Object, Object> entry, Object... arguments) {
Object oldVal = entry.getValue();
entry.remove();
return oldVal;
}
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doInvokeAll(IgniteCache<Object, Object> cache) throws Exception {
Map<Object, EntryProcessor<Object, Object, Object>> map = new TreeMap<>();
Class keyCls = randomKeyClass(cache.getName());
for (int cnt = 0; cnt < args.batch(); cnt++) {
int i = nextRandom(args.range());
Object key = ModelUtil.create(keyCls, i);
if (nextBoolean())
map.put(key,
new BenchmarkReplaceValueEntryProcessor(createRandomValue(i + 1, cache.getName())));
else
map.put(key, rmvEntryProc);
}
cache.invokeAll(map);
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doRemove(IgniteCache<Object, Object> cache) throws Exception {
int i = nextRandom(args.range());
cache.remove(createRandomKey(i, cache.getName()));
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doRemoveAll(IgniteCache<Object, Object> cache) throws Exception {
Set<Object> keys = new TreeSet<>();
Class keyCls = randomKeyClass(cache.getName());
for (int cnt = 0; cnt < args.batch(); cnt++) {
int i = nextRandom(args.range());
keys.add(ModelUtil.create(keyCls, i));
}
cache.removeAll(keys);
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doPutIfAbsent(IgniteCache<Object, Object> cache) throws Exception {
int i = nextRandom(args.range());
cache.putIfAbsent(createRandomKey(i, cache.getName()), createRandomValue(i, cache.getName()));
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doReplace(IgniteCache<Object, Object> cache) throws Exception {
int i = nextRandom(args.range());
cache.replace(createRandomKey(i, cache.getName()),
createRandomValue(i, cache.getName()),
createRandomValue(i + 1, cache.getName()));
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doScanQuery(IgniteCache<Object, Object> cache) throws Exception {
if (!affCaches.contains(cache))
return;
Map<UUID, List<Integer>> partitionsMap = personCachePartitions(cache.getName());
ScanQueryBroadcastClosure c = new ScanQueryBroadcastClosure(cache.getName(), partitionsMap);
ClusterGroup clusterGrp = ignite().cluster().forNodeIds(partitionsMap.keySet());
IgniteCompute compute = ignite().compute(clusterGrp);
compute.broadcast(c);
}
/**
* @param cache Ignite cache.
* @throws Exception If failed.
*/
private void doSqlQuery(IgniteCache<Object, Object> cache) {
List<SqlCacheDescriptor> descriptors = cacheSqlDescriptors.get(cache.getName());
if (descriptors != null) {
Query sq = null;
if (queries.isEmpty()) {
if (!descriptors.isEmpty()) {
SqlCacheDescriptor randomDesc = descriptors.get(nextRandom(descriptors.size()));
int id = nextRandom(args.range());
sq = nextBoolean() ? randomDesc.getSqlQuery(id) : randomDesc.getSqlFieldsQuery(id);
}
}
else {
TestQuery qry = queries.get(nextRandom(queries.size()));
String sql = randomizeSql(qry.sql);
sq = new SqlFieldsQuery(sql);
((SqlFieldsQuery)sq).setDistributedJoins(qry.distributedJoin);
}
if (sq != null)
try (QueryCursor cursor = cache.query(sq)) {
for (Object obj : cursor) {
// No-op.
}
}
}
}
/**
* @param sql Base SQL.
* @return Randomized SQL string.
*/
private String randomizeSql(String sql) {
int cnt = StringUtils.countOccurrencesOf(sql, "%s");
Integer[] sub = new Integer[cnt];
for (int i = 0; i < cnt; i++)
sub[i] = nextRandom(args.range());
sql = String.format(sql, sub);
return sql;
}
/**
* @param cache Ignite cache.
* @param map Parameters map.
* @throws Exception If failed.
*/
private void doContinuousQuery(IgniteCache<Object, Object> cache, Map<Object, Object> map) throws Exception {
List<QueryCursor> cursors = (ArrayList<QueryCursor>)map.get(cache.getName());
if (cursors == null) {
cursors = new ArrayList<>(CONTINUOUS_QUERY_PER_CACHE);
map.put(cache.getName(), cursors);
}
if (cursors.size() == CONTINUOUS_QUERY_PER_CACHE) {
QueryCursor cursor = cursors.get(nextRandom(cursors.size()));
cursor.close();
cursors.remove(cursor);
}
ContinuousQuery qry = new ContinuousQuery();
qry.setLocalListener(new ContinuousQueryUpdater());
qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new ContinuousQueryFilter()));
cursors.add(cache.query(qry));
}
/**
* @return Nex random boolean value.
*/
private boolean nextBoolean() {
return ThreadLocalRandom.current().nextBoolean();
}
/**
* Continuous query updater class.
*/
private static class ContinuousQueryUpdater implements CacheEntryUpdatedListener, Serializable {
/** {@inheritDoc} */
@Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
for (Object o : iterable) {
// No-op.
}
}
}
/**
* Continuous query filter class.
*/
private static class ContinuousQueryFilter implements CacheEntryEventSerializableFilter, Serializable {
/** */
private boolean flag = true;
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
return flag = !flag;
}
}
/**
* Closure for scan query executing.
*/
private static class ScanQueryBroadcastClosure implements IgniteRunnable {
/**
* Ignite node.
*/
@IgniteInstanceResource
private Ignite node;
/**
* Information about partition.
*/
private Map<UUID, List<Integer>> cachePart;
/**
* Name of Ignite cache.
*/
private String cacheName;
/**
* @param cacheName Name of Ignite cache.
* @param cachePart Partition by node for Ignite cache.
*/
private ScanQueryBroadcastClosure(String cacheName, Map<UUID, List<Integer>> cachePart) {
this.cachePart = cachePart;
this.cacheName = cacheName;
}
/** {@inheritDoc} */
@Override public void run() {
IgniteCache cache = node.cache(cacheName);
// Getting a list of the partitions owned by this node.
List<Integer> myPartitions = cachePart.get(node.cluster().localNode().id());
for (Integer part : myPartitions) {
ScanQuery scanQry = new ScanQuery();
scanQry.setPartition(part);
scanQry.setFilter(igniteBiPred);
try (QueryCursor cursor = cache.query(scanQry)) {
for (Object obj : cursor) {
// No-op.
}
}
}
}
}
/**
* Scan query predicate class.
*/
private static class BenchmarkIgniteBiPredicate implements IgniteBiPredicate {
/**
* @param key Cache key.
* @param val Cache value.
* @return true If is hit.
*/
@Override public boolean apply(Object key, Object val) {
return true;
}
}
/**
* Query descriptor.
*/
private static class SqlCacheDescriptor {
/**
* Class of value.
*/
private Class valCls;
/**
* Select fields.
*/
private Set<String> fields;
/**
* Indexed fields.
*/
private Map<String, Class> indexedFieldsByCls;
/**
* @param valCls Class of value.
* @param fields All select fields.
* @param indexedFieldsByCls Indexed fields.
*/
SqlCacheDescriptor(Class valCls, Set<String> fields,
Map<String, Class> indexedFieldsByCls) {
this.valCls = valCls;
this.fields = fields;
this.indexedFieldsByCls = indexedFieldsByCls;
}
/**
* @param id Query id.
* @return Condition string.
*/
private String makeQuerySelect(int id) {
return StringUtils.collectionToDelimitedString(fields, ", ");
}
/**
* @param id Query id.
* @return Condition string.
*/
private String makeQueryCondition(int id) {
StringBuilder sb = new StringBuilder();
for (String iField : indexedFieldsByCls.keySet()) {
Class cl = indexedFieldsByCls.get(iField);
if (!Number.class.isAssignableFrom(cl) && !String.class.equals(cl))
continue;
if (sb.length() != 0) {
switch (id % 3 % 2) {
case 0:
sb.append(" OR ");
break;
case 1:
sb.append(" AND ");
break;
}
}
if (Number.class.isAssignableFrom(cl)) {
sb.append(iField);
switch (id % 2) {
case 0:
sb.append(" > ");
break;
case 1:
sb.append(" < ");
break;
}
sb.append(id);
}
else if (String.class.equals(cl))
sb.append("lower(").append(iField).append(") LIKE lower('%").append(id).append("%')");
}
return sb.toString();
}
/**
* @param id Query id.
* @return SQL query object.
*/
SqlQuery getSqlQuery(int id) {
return new SqlQuery(valCls, makeQueryCondition(id));
}
/**
* @param id Query id.
* @return SQL filed query object.
*/
SqlFieldsQuery getSqlFieldsQuery(int id) {
return new SqlFieldsQuery(String.format("SELECT %s FROM %s WHERE %s",
makeQuerySelect(id), valCls.getSimpleName(), makeQueryCondition(id)));
}
}
/**
* Cache operation enum.
*/
private enum Operation {
/** Put operation. */
PUT,
/** Put all operation. */
PUT_ALL,
/** Get operation. */
GET,
/** Get all operation. */
GET_ALL,
/** Invoke operation. */
INVOKE,
/** Invoke all operation. */
INVOKE_ALL,
/** Remove operation. */
REMOVE,
/** Remove all operation. */
REMOVE_ALL,
/** Put if absent operation. */
PUT_IF_ABSENT,
/** Replace operation. */
REPLACE,
/** Scan query operation. */
SCAN_QUERY,
/** SQL query operation. */
SQL_QUERY,
/** Continuous Query. */
CONTINUOUS_QUERY;
/**
* @param num Number of operation.
* @return Operation.
*/
public static Operation valueOf(int num) {
return values()[num];
}
}
/**
*
*/
private static class TestQuery {
/** */
private final String sql;
/** */
private final boolean distributedJoin;
/**
* @param sql SQL.
* @param distributedJoin Distributed join flag.
*/
public TestQuery(String sql, boolean distributedJoin) {
this.sql = sql;
this.distributedJoin = distributedJoin;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestQuery.class, this);
}
}
}