blob: 27f5fb9052f9ae2b215809c912208bced3a276c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Creates multiple threads that read and verify previously written data */
public class MultiThreadedReader extends MultiThreadedAction
{
private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
protected Set<HBaseReaderThread> readers = new HashSet<>();
private final double verifyPercent;
protected volatile boolean aborted;
protected MultiThreadedWriterBase writer = null;
/**
* The number of keys verified in a sequence. This will never be larger than
* the total number of keys in the range. The reader might also verify
* random keys when it catches up with the writer.
*/
private final AtomicLong numUniqueKeysVerified = new AtomicLong();
/**
* Default maximum number of read errors to tolerate before shutting down all
* readers.
*/
public static final int DEFAULT_MAX_ERRORS = 10;
/**
* Default "window" size between the last key written by the writer and the
* key that we attempt to read. The lower this number, the stricter our
* testing is. If this is zero, we always attempt to read the highest key
* in the contiguous sequence of keys written by the writers.
*/
public static final int DEFAULT_KEY_WINDOW = 0;
/**
* Default batch size for multigets
*/
public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET)
protected AtomicLong numKeysVerified = new AtomicLong(0);
protected AtomicLong numReadErrors = new AtomicLong(0);
protected AtomicLong numReadFailures = new AtomicLong(0);
protected AtomicLong nullResult = new AtomicLong(0);
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
private int batchSize = DEFAULT_BATCH_SIZE;
private int regionReplicaId = -1; // particular region replica id to do reads against if set
public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double verifyPercent) throws IOException {
super(dataGen, conf, tableName, "R");
this.verifyPercent = verifyPercent;
}
public void linkToWriter(MultiThreadedWriterBase writer) {
this.writer = writer;
writer.setTrackWroteKeys(true);
}
public void setMaxErrors(int maxErrors) {
this.maxErrors = maxErrors;
}
public void setKeyWindow(int keyWindow) {
this.keyWindow = keyWindow;
}
public void setMultiGetBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public void setRegionReplicaId(int regionReplicaId) {
this.regionReplicaId = regionReplicaId;
}
@Override
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
}
addReaderThreads(numThreads);
startThreads(readers);
}
protected void addReaderThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseReaderThread reader = createReaderThread(i);
readers.add(reader);
}
}
protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
HBaseReaderThread reader = new HBaseReaderThread(readerId);
Threads.setLoggingUncaughtExceptionHandler(reader);
return reader;
}
public class HBaseReaderThread extends Thread {
protected final int readerId;
protected final Table table;
/** The "current" key being read. Increases from startKey to endKey. */
private long curKey;
/** Time when the thread started */
protected long startTimeMs;
/** If we are ahead of the writer and reading a random key. */
private boolean readingRandomKey;
private boolean printExceptionTrace = true;
/**
* @param readerId only the keys with this remainder from division by
* {@link #numThreads} will be read by this thread
*/
public HBaseReaderThread(int readerId) throws IOException {
this.readerId = readerId;
table = createTable();
setName(getClass().getSimpleName() + "_" + readerId);
}
protected Table createTable() throws IOException {
return connection.getTable(tableName);
}
@Override
public void run() {
try {
runReader();
} finally {
closeTable();
numThreadsWorking.decrementAndGet();
}
}
protected void closeTable() {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
LOG.error("Error closing table", e);
}
}
private void runReader() {
if (verbose) {
LOG.info("Started thread #" + readerId + " for reads...");
}
startTimeMs = System.currentTimeMillis();
curKey = startKey;
long [] keysForThisReader = new long[batchSize];
while (curKey < endKey && !aborted) {
int readingRandomKeyStartIndex = -1;
int numKeys = 0;
// if multiGet, loop until we have the number of keys equal to the batch size
do {
long k = getNextKeyToRead();
if (k < startKey || k >= endKey) {
numReadErrors.incrementAndGet();
throw new AssertionError("Load tester logic error: proposed key " +
"to read " + k + " is out of range (startKey=" + startKey +
", endKey=" + endKey + ")");
}
if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
// Skip keys that this thread should not read, as well as the keys
// that we know the writer failed to write.
continue;
}
keysForThisReader[numKeys] = k;
if (readingRandomKey && readingRandomKeyStartIndex == -1) {
//store the first index of a random read
readingRandomKeyStartIndex = numKeys;
}
numKeys++;
} while (numKeys < batchSize && curKey < endKey && !aborted);
if (numKeys > 0) { //meaning there is some key to read
readKey(keysForThisReader);
// We have verified some unique key(s).
numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
numKeys : readingRandomKeyStartIndex);
}
}
}
/**
* Should only be used for the concurrent writer/reader workload. The
* maximum key we are allowed to read, subject to the "key window"
* constraint.
*/
private long maxKeyWeCanRead() {
long insertedUpToKey = writer.wroteUpToKey();
if (insertedUpToKey >= endKey - 1) {
// The writer has finished writing our range, so we can read any
// key in the range.
return endKey - 1;
}
return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
}
protected long getNextKeyToRead() {
readingRandomKey = false;
if (writer == null || curKey <= maxKeyWeCanRead()) {
return curKey++;
}
// We caught up with the writer. See if we can read any keys at all.
long maxKeyToRead;
while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
// The writer has not written sufficient keys for us to be able to read
// anything at all. Sleep a bit. This should only happen in the
// beginning of a load test run.
Threads.sleepWithoutInterrupt(50);
}
if (curKey <= maxKeyToRead) {
// The writer wrote some keys, and we are now allowed to read our
// current key.
return curKey++;
}
// startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
// Don't increment the current key -- we still have to try reading it
// later. Set a flag to make sure that we don't count this key towards
// the set of unique keys we have verified.
readingRandomKey = true;
return startKey + Math.abs(RandomUtils.nextLong())
% (maxKeyToRead - startKey + 1);
}
private Get[] readKey(long[] keysToRead) {
Get [] gets = new Get[keysToRead.length];
int i = 0;
for (long keyToRead : keysToRead) {
try {
gets[i] = createGet(keyToRead);
if (keysToRead.length == 1) {
queryKey(gets[i], RandomUtils.nextInt(0, 100) < verifyPercent, keyToRead);
}
i++;
} catch (IOException e) {
numReadFailures.addAndGet(1);
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+ ", time from start: "
+ (System.currentTimeMillis() - startTimeMs) + " ms");
if (printExceptionTrace) {
LOG.warn(e.toString(), e);
printExceptionTrace = false;
}
}
}
if (keysToRead.length > 1) {
try {
queryKey(gets, RandomUtils.nextInt(0, 100) < verifyPercent, keysToRead);
} catch (IOException e) {
numReadFailures.addAndGet(gets.length);
for (long keyToRead : keysToRead) {
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+ ", time from start: "
+ (System.currentTimeMillis() - startTimeMs) + " ms");
}
if (printExceptionTrace) {
LOG.warn(e.toString(), e);
printExceptionTrace = false;
}
}
}
return gets;
}
protected Get createGet(long keyToRead) throws IOException {
Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
String cfsString = "";
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
for (byte[] cf : columnFamilies) {
get.addFamily(cf);
if (verbose) {
if (cfsString.length() > 0) {
cfsString += ", ";
}
cfsString += "[" + Bytes.toStringBinary(cf) + "]";
}
}
get = dataGenerator.beforeGet(keyToRead, get);
if (regionReplicaId > 0) {
get.setReplicaId(regionReplicaId);
get.setConsistency(Consistency.TIMELINE);
}
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
return get;
}
public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
// read the data
long start = System.nanoTime();
// Uses multi/batch gets
Result[] results = table.get(Arrays.asList(gets));
long end = System.nanoTime();
verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
}
public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
// read the data
long start = System.nanoTime();
// Uses simple get
Result result = table.get(get);
long end = System.nanoTime();
verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
}
protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
Result[] results, Table table, boolean isNullExpected)
throws IOException {
totalOpTimeMs.addAndGet(elapsedNano / 1000000);
numKeys.addAndGet(gets.length);
int i = 0;
for (Result result : results) {
verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
isNullExpected);
}
}
protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
Result result, Table table, boolean isNullExpected)
throws IOException {
verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
new Result[]{result}, table, isNullExpected);
}
private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
Result result, Table table, boolean isNullExpected) throws IOException {
if (!result.isEmpty()) {
if (verify) {
numKeysVerified.incrementAndGet();
}
} else {
HRegionLocation hloc;
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
hloc = locator.getRegionLocation(get.getRow());
}
String rowKey = Bytes.toString(get.getRow());
LOG.info("Key = " + rowKey + ", Region location: " + hloc);
if(isNullExpected) {
nullResult.incrementAndGet();
LOG.debug("Null result obtained for the key ="+rowKey);
return;
}
}
boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
long numErrorsAfterThis = 0;
if (isOk) {
long cols = 0;
// Count the columns for reporting purposes.
for (byte[] cf : result.getMap().keySet()) {
cols += result.getFamilyMap(cf).size();
}
numCols.addAndGet(cols);
} else {
if (writer != null) {
LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
}
numErrorsAfterThis = numReadErrors.incrementAndGet();
}
if (numErrorsAfterThis > maxErrors) {
LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
aborted = true;
}
}
}
public long getNumReadFailures() {
return numReadFailures.get();
}
public long getNumReadErrors() {
return numReadErrors.get();
}
public long getNumKeysVerified() {
return numKeysVerified.get();
}
public long getNumUniqueKeysVerified() {
return numUniqueKeysVerified.get();
}
public long getNullResultsCount() {
return nullResult.get();
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "verified", numKeysVerified.get());
appendToStatus(sb, "READ FAILURES", numReadFailures.get());
appendToStatus(sb, "READ ERRORS", numReadErrors.get());
appendToStatus(sb, "NULL RESULT", nullResult.get());
return sb.toString();
}
}