blob: b312cca1d8426001facebbbc73d9150eb1f13200 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test ({@link LoadTestTool}).
*/
public abstract class MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
protected final byte[] tableName;
protected final byte[] columnFamily;
protected final Configuration conf;
protected int numThreads = 1;
/** The start key of the key range, inclusive */
protected long startKey = 0;
/** The end key of the key range, exclusive */
protected long endKey = 1;
protected AtomicInteger numThreadsWorking = new AtomicInteger();
protected AtomicLong numKeys = new AtomicLong();
protected AtomicLong numCols = new AtomicLong();
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
protected int minDataSize = 256;
protected int maxDataSize = 1024;
/** "R" or "W" */
private String actionLetter;
/** Whether we need to print out Hadoop Streaming-style counters */
private boolean streamingCounters;
public static final int REPORTING_INTERVAL_MS = 5000;
public MultiThreadedAction(Configuration conf, byte[] tableName,
byte[] columnFamily, String actionLetter) {
this.conf = conf;
this.tableName = tableName;
this.columnFamily = columnFamily;
this.actionLetter = actionLetter;
}
public void start(long startKey, long endKey, int numThreads)
throws IOException {
this.startKey = startKey;
this.endKey = endKey;
this.numThreads = numThreads;
(new Thread(new ProgressReporter(actionLetter))).start();
}
private static String formatTime(long elapsedTime) {
String format = String.format("%%0%dd", 2);
elapsedTime = elapsedTime / 1000;
String seconds = String.format(format, elapsedTime % 60);
String minutes = String.format(format, (elapsedTime % 3600) / 60);
String hours = String.format(format, elapsedTime / 3600);
String time = hours + ":" + minutes + ":" + seconds;
return time;
}
/** Asynchronously reports progress */
private class ProgressReporter implements Runnable {
private String reporterId = "";
public ProgressReporter(String id) {
this.reporterId = id;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
long priorNumKeys = 0;
long priorCumulativeOpTime = 0;
int priorAverageKeysPerSecond = 0;
// Give other threads time to start.
Threads.sleep(REPORTING_INTERVAL_MS);
while (numThreadsWorking.get() != 0) {
String threadsLeft =
"[" + reporterId + ":" + numThreadsWorking.get() + "] ";
if (numKeys.get() == 0) {
LOG.info(threadsLeft + "Number of keys = 0");
} else {
long numKeys = MultiThreadedAction.this.numKeys.get();
long time = System.currentTimeMillis() - startTime;
long totalOpTime = totalOpTimeMs.get();
long numKeysDelta = numKeys - priorNumKeys;
long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
double averageKeysPerSecond =
(time > 0) ? (numKeys * 1000 / time) : 0;
LOG.info(threadsLeft
+ "Keys="
+ numKeys
+ ", cols="
+ StringUtils.humanReadableInt(numCols.get())
+ ", time="
+ formatTime(time)
+ ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
+ numKeys * 1000 / time + ", latency=" + totalOpTime
/ numKeys + " ms]") : "")
+ ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
+ numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
+ totalOpTimeDelta / numKeysDelta + " ms]") : "")
+ progressInfo());
if (streamingCounters) {
printStreamingCounters(numKeysDelta,
averageKeysPerSecond - priorAverageKeysPerSecond);
}
priorNumKeys = numKeys;
priorCumulativeOpTime = totalOpTime;
priorAverageKeysPerSecond = (int) averageKeysPerSecond;
}
Threads.sleep(REPORTING_INTERVAL_MS);
}
}
private void printStreamingCounters(long numKeysDelta,
double avgKeysPerSecondDelta) {
// Write stats in a format that can be interpreted as counters by
// streaming map-reduce jobs.
System.err.println("reporter:counter:numKeys," + reporterId + ","
+ numKeysDelta);
System.err.println("reporter:counter:numCols," + reporterId + ","
+ numCols.get());
System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
+ "," + (long) (avgKeysPerSecondDelta));
}
}
public void setDataSize(int minDataSize, int maxDataSize) {
this.minDataSize = minDataSize;
this.maxDataSize = maxDataSize;
}
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
}
protected void startThreads(Collection<? extends Thread> threads) {
numThreadsWorking.addAndGet(threads.size());
for (Thread thread : threads) {
thread.start();
}
}
/** @return the end key of the key range, exclusive */
public long getEndKey() {
return endKey;
}
/** Returns a task-specific progress string */
protected abstract String progressInfo();
protected static void appendToStatus(StringBuilder sb, String desc,
long v) {
if (v == 0) {
return;
}
sb.append(", ");
sb.append(desc);
sb.append("=");
sb.append(v);
}
}