blob: 9a69eb8c91dc28b9ab4178439f798758b54e368a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.tests.load;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.tests.utils.TestsHelper;
import org.apache.log4j.Logger;
/**
* Worker thread abstraction to be inherited by specific load test implementation
*/
public abstract class Worker extends Thread {
/** */
private long testStartTime;
/** */
boolean warmup = TestsHelper.getLoadTestsWarmupPeriod() != 0;
/** */
private volatile long warmupStartTime;
/** */
private volatile long warmupFinishTime;
/** */
private volatile long startTime;
/** */
private volatile long finishTime;
/** */
private volatile long warmupMsgProcessed;
/** */
private volatile long warmupSleepCnt;
/** */
private volatile long msgProcessed;
/** */
private volatile long msgFailed;
/** */
private volatile long sleepCnt;
/** */
private Throwable executionError;
/** */
private long statReportedTime;
/** */
private CacheStore cacheStore;
/** */
private Ignite ignite;
/** */
private IgniteCache igniteCache;
/** */
private Logger log;
/** */
private long startPosition;
/** */
private long endPosition;
/** */
public Worker(CacheStore cacheStore, long startPosition, long endPosition) {
this.cacheStore = cacheStore;
this.log = Logger.getLogger(loggerName());
this.startPosition = startPosition;
this.endPosition = endPosition;
}
/** */
public Worker(Ignite ignite, long startPosition, long endPosition) {
this.ignite = ignite;
this.log = Logger.getLogger(loggerName());
this.startPosition = startPosition;
this.endPosition = endPosition;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run() {
try {
if (ignite != null)
igniteCache = ignite.getOrCreateCache(new CacheConfiguration(TestsHelper.getLoadTestsCacheName()));
execute();
}
catch (Throwable e) {
executionError = e;
throw new RuntimeException("Test execution abnormally terminated", e);
}
finally {
reportTestCompletion();
}
}
/** */
public boolean isFailed() {
return executionError != null;
}
/** */
public long getSpeed() {
if (msgProcessed == 0)
return 0;
long finish = finishTime != 0 ? finishTime : System.currentTimeMillis();
long duration = (finish - startTime - sleepCnt * TestsHelper.getLoadTestsRequestsLatency()) / 1000;
return duration == 0 ? msgProcessed : msgProcessed / duration;
}
/** */
public long getErrorsCount() {
return msgFailed;
}
/** */
public float getErrorsPercent() {
if (msgFailed == 0)
return 0;
return msgProcessed + msgFailed == 0 ? 0 : (float)(msgFailed * 100 ) / (float)(msgProcessed + msgFailed);
}
/** */
public long getMsgCountTotal() {
return warmupMsgProcessed + msgProcessed;
}
/** */
public long getWarmupMsgProcessed() {
return warmupMsgProcessed;
}
/** */
public long getMsgProcessed() {
return msgProcessed;
}
/** */
protected abstract String loggerName();
/** */
protected abstract boolean batchMode();
/** */
protected void process(CacheStore cacheStore, CacheEntryImpl entry) {
throw new UnsupportedOperationException("Single message processing is not supported");
}
/** */
protected void process(IgniteCache cache, Object key, Object val) {
throw new UnsupportedOperationException("Single message processing is not supported");
}
/** */
protected void process(CacheStore cacheStore, Collection<CacheEntryImpl> entries) {
throw new UnsupportedOperationException("Batch processing is not supported");
}
/** */
protected void process(IgniteCache cache, Map map) {
throw new UnsupportedOperationException("Batch processing is not supported");
}
/** */
@SuppressWarnings("unchecked")
private void execute() throws InterruptedException {
testStartTime = System.currentTimeMillis();
log.info("Test execution started");
if (warmup)
log.info("Warm up period started");
warmupStartTime = warmup ? testStartTime : 0;
startTime = !warmup ? testStartTime : 0;
statReportedTime = testStartTime;
long cntr = startPosition;
Object key = TestsHelper.generateLoadTestsKey(cntr);
Object val = TestsHelper.generateLoadTestsValue(cntr);
List<CacheEntryImpl> batchList = new ArrayList<>(TestsHelper.getBulkOperationSize());
Map batchMap = new HashMap(TestsHelper.getBulkOperationSize());
int execTime = TestsHelper.getLoadTestsWarmupPeriod() + TestsHelper.getLoadTestsExecutionTime();
try {
while (true) {
if (System.currentTimeMillis() - testStartTime > execTime)
break;
if (warmup && System.currentTimeMillis() - testStartTime > TestsHelper.getLoadTestsWarmupPeriod()) {
warmupFinishTime = System.currentTimeMillis();
startTime = warmupFinishTime;
statReportedTime = warmupFinishTime;
warmup = false;
log.info("Warm up period completed");
}
if (!batchMode()) {
if (cacheStore != null)
doWork(new CacheEntryImpl(key, val));
else
doWork(key, val);
}
else if (batchList.size() == TestsHelper.getBulkOperationSize() ||
batchMap.size() == TestsHelper.getBulkOperationSize()) {
if (cacheStore != null)
doWork(batchList);
else
doWork(batchMap);
batchMap.clear();
batchList.clear();
}
if (cntr == endPosition)
cntr = startPosition;
else
cntr++;
key = TestsHelper.generateLoadTestsKey(cntr);
val = TestsHelper.generateLoadTestsValue(cntr);
if (batchMode()) {
if (cacheStore != null)
batchList.add(new CacheEntryImpl(key, val));
else
batchMap.put(key, val);
}
reportStatistics();
}
}
finally {
warmupFinishTime = warmupFinishTime != 0 ? warmupFinishTime : System.currentTimeMillis();
finishTime = System.currentTimeMillis();
}
}
/** */
private void doWork(CacheEntryImpl entry) {
try {
process(cacheStore, entry);
updateMetrics(1);
}
catch (Throwable e) {
log.error("Failed to perform single operation", e);
updateErrorMetrics(1);
}
}
/** */
private void doWork(Object key, Object val) {
try {
process(igniteCache, key, val);
updateMetrics(1);
}
catch (Throwable e) {
log.error("Failed to perform single operation", e);
updateErrorMetrics(1);
}
}
/** */
private void doWork(Collection<CacheEntryImpl> entries) {
try {
process(cacheStore, entries);
updateMetrics(entries.size());
}
catch (Throwable e) {
log.error("Failed to perform batch operation", e);
updateErrorMetrics(entries.size());
}
}
/** */
private void doWork(Map entries) {
try {
process(igniteCache, entries);
updateMetrics(entries.size());
}
catch (Throwable e) {
log.error("Failed to perform batch operation", e);
updateErrorMetrics(entries.size());
}
}
/** */
private long getWarmUpSpeed() {
if (warmupMsgProcessed == 0)
return 0;
long finish = warmupFinishTime != 0 ? warmupFinishTime : System.currentTimeMillis();
long duration = (finish - warmupStartTime - warmupSleepCnt * TestsHelper.getLoadTestsRequestsLatency()) / 1000;
return duration == 0 ? warmupMsgProcessed : warmupMsgProcessed / duration;
}
/** */
private void updateMetrics(int itemsProcessed) {
if (warmup)
warmupMsgProcessed += itemsProcessed;
else
msgProcessed += itemsProcessed;
if (TestsHelper.getLoadTestsRequestsLatency() > 0) {
try {
Thread.sleep(TestsHelper.getLoadTestsRequestsLatency());
if (warmup)
warmupSleepCnt++;
else
sleepCnt++;
}
catch (Throwable ignored) {
}
}
}
/**
* TODO IGNITE-1371 Comment absent.
*
* @param itemsFailed Failed item.
*/
private void updateErrorMetrics(int itemsFailed) {
if (!warmup)
msgFailed += itemsFailed;
}
/** */
private void reportStatistics() {
// statistics should be reported only every 30 seconds
if (System.currentTimeMillis() - statReportedTime < 30000)
return;
statReportedTime = System.currentTimeMillis();
int completed = warmup ?
(int)(statReportedTime - warmupStartTime) * 100 / TestsHelper.getLoadTestsWarmupPeriod() :
(int)(statReportedTime - startTime) * 100 / TestsHelper.getLoadTestsExecutionTime();
if (completed > 100)
completed = 100;
if (warmup) {
log.info("Warm up messages processed " + warmupMsgProcessed + ", " +
"speed " + getWarmUpSpeed() + " msg/sec, " + completed + "% completed");
}
else {
log.info("Messages processed " + msgProcessed + ", " +
"speed " + getSpeed() + " msg/sec, " + completed + "% completed, " +
"errors " + msgFailed + " / " + String.format("%.2f", getErrorsPercent()).replace(",", ".") + "%");
}
}
/** */
private void reportTestCompletion() {
StringBuilder builder = new StringBuilder();
if (executionError != null)
builder.append("Test execution abnormally terminated. ");
else
builder.append("Test execution successfully completed. ");
builder.append("Statistics: ").append(SystemHelper.LINE_SEPARATOR);
builder.append("Start time: ")
.append(IgniteUtils.SHORT_DATE_FMT.format(Instant.ofEpochMilli(testStartTime)))
.append(SystemHelper.LINE_SEPARATOR);
builder.append("Finish time: ")
.append(IgniteUtils.SHORT_DATE_FMT.format(Instant.ofEpochMilli(finishTime)))
.append(SystemHelper.LINE_SEPARATOR);
builder.append("Duration: ").append((finishTime - testStartTime) / 1000).append(" sec")
.append(SystemHelper.LINE_SEPARATOR);
if (TestsHelper.getLoadTestsWarmupPeriod() > 0) {
builder.append("Warm up period: ").append(TestsHelper.getLoadTestsWarmupPeriod() / 1000)
.append(" sec").append(SystemHelper.LINE_SEPARATOR);
builder.append("Warm up processed messages: ").append(warmupMsgProcessed).append(SystemHelper.LINE_SEPARATOR);
builder.append("Warm up processing speed: ").append(getWarmUpSpeed())
.append(" msg/sec").append(SystemHelper.LINE_SEPARATOR);
}
builder.append("Processed messages: ").append(msgProcessed).append(SystemHelper.LINE_SEPARATOR);
builder.append("Processing speed: ").append(getSpeed()).append(" msg/sec").append(SystemHelper.LINE_SEPARATOR);
builder.append("Errors: ").append(msgFailed).append(" / ").
append(String.format("%.2f", getErrorsPercent()).replace(",", ".")).append("%");
if (executionError != null)
log.error(builder.toString(), executionError);
else
log.info(builder.toString());
}
}