blob: 19461434e894f38fa09d74ecc368d8c60e42f271 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test (See LoadTestTool).
*/
public abstract class MultiThreadedAction {
private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
protected final TableName tableName;
protected final Configuration conf;
protected final Connection connection; // all reader / writer threads will share this connection
protected int numThreads = 1;
/** The start key of the key range, inclusive */
protected long startKey = 0;
/** The end key of the key range, exclusive */
protected long endKey = 1;
protected AtomicInteger numThreadsWorking = new AtomicInteger();
protected AtomicLong numKeys = new AtomicLong();
protected AtomicLong numCols = new AtomicLong();
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
protected LoadTestDataGenerator dataGenerator = null;
/**
* Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
* set of column families, and random number of columns in range. The table for it can
* be created manually or, for example, via
* {@link org.apache.hadoop.hbase.HBaseTestingUtil#createPreSplitLoadTestTable(Configuration,
* TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm,
* org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
*/
public static class DefaultDataGenerator extends LoadTestDataGenerator {
private byte[][] columnFamilies = null;
private int minColumnsPerKey;
private int maxColumnsPerKey;
private final Random random = new Random();
public DefaultDataGenerator(int minValueSize, int maxValueSize,
int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
super(minValueSize, maxValueSize);
this.columnFamilies = columnFamilies;
this.minColumnsPerKey = minColumnsPerKey;
this.maxColumnsPerKey = maxColumnsPerKey;
}
public DefaultDataGenerator(byte[]... columnFamilies) {
// Default values for tests that didn't care to provide theirs.
this(256, 1024, 1, 10, columnFamilies);
}
@Override
public byte[] getDeterministicUniqueKey(long keyBase) {
return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
}
@Override
public byte[][] getColumnFamilies() {
return columnFamilies;
}
@Override
public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
byte[][] columns = new byte[numColumns][];
for (int i = 0; i < numColumns; ++i) {
columns[i] = Bytes.toBytes(Integer.toString(i));
}
return columns;
}
@Override
public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
}
@Override
public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
return LoadTestKVGenerator.verify(value, rowKey, cf, column);
}
@Override
public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
}
}
/** "R" or "W" */
private String actionLetter;
/** Whether we need to print out Hadoop Streaming-style counters */
private boolean streamingCounters;
public static final int REPORTING_INTERVAL_MS = 5000;
public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
String actionLetter) throws IOException {
this.conf = conf;
this.dataGenerator = dataGen;
this.tableName = tableName;
this.actionLetter = actionLetter;
this.connection = ConnectionFactory.createConnection(conf);
}
public void start(long startKey, long endKey, int numThreads) throws IOException {
this.startKey = startKey;
this.endKey = endKey;
this.numThreads = numThreads;
(new Thread(new ProgressReporter(actionLetter),
"MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start();
}
private static String formatTime(long elapsedTime) {
String format = String.format("%%0%dd", 2);
elapsedTime = elapsedTime / 1000;
String seconds = String.format(format, elapsedTime % 60);
String minutes = String.format(format, (elapsedTime % 3600) / 60);
String hours = String.format(format, elapsedTime / 3600);
String time = hours + ":" + minutes + ":" + seconds;
return time;
}
/** Asynchronously reports progress */
private class ProgressReporter implements Runnable {
private String reporterId = "";
public ProgressReporter(String id) {
this.reporterId = id;
}
@Override
public void run() {
long startTime = EnvironmentEdgeManager.currentTime();
long priorNumKeys = 0;
long priorCumulativeOpTime = 0;
int priorAverageKeysPerSecond = 0;
// Give other threads time to start.
Threads.sleep(REPORTING_INTERVAL_MS);
while (numThreadsWorking.get() != 0) {
String threadsLeft =
"[" + reporterId + ":" + numThreadsWorking.get() + "] ";
if (numKeys.get() == 0) {
LOG.info(threadsLeft + "Number of keys = 0");
} else {
long numKeys = MultiThreadedAction.this.numKeys.get();
long time = EnvironmentEdgeManager.currentTime() - startTime;
long totalOpTime = totalOpTimeMs.get();
long numKeysDelta = numKeys - priorNumKeys;
long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
double averageKeysPerSecond =
(time > 0) ? (numKeys * 1000 / time) : 0;
LOG.info(threadsLeft
+ "Keys="
+ numKeys
+ ", cols="
+ StringUtils.humanReadableInt(numCols.get())
+ ", time="
+ formatTime(time)
+ ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
+ numKeys * 1000 / time + ", latency="
+ String.format("%.2f", (double)totalOpTime / (double)numKeys)
+ " ms]") : "")
+ ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
+ numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
+ String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta)
+ " ms]") : "")
+ progressInfo());
if (streamingCounters) {
printStreamingCounters(numKeysDelta,
averageKeysPerSecond - priorAverageKeysPerSecond);
}
priorNumKeys = numKeys;
priorCumulativeOpTime = totalOpTime;
priorAverageKeysPerSecond = (int) averageKeysPerSecond;
}
Threads.sleep(REPORTING_INTERVAL_MS);
}
}
private void printStreamingCounters(long numKeysDelta,
double avgKeysPerSecondDelta) {
// Write stats in a format that can be interpreted as counters by
// streaming map-reduce jobs.
System.err.println("reporter:counter:numKeys," + reporterId + ","
+ numKeysDelta);
System.err.println("reporter:counter:numCols," + reporterId + ","
+ numCols.get());
System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
+ "," + (long) (avgKeysPerSecondDelta));
}
}
public void close() {
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
LOG.warn("Could not close the connection: " + ex);
}
}
}
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
close();
}
public boolean isDone() {
return (numThreadsWorking.get() == 0);
}
protected void startThreads(Collection<? extends Thread> threads) {
numThreadsWorking.addAndGet(threads.size());
for (Thread thread : threads) {
thread.start();
}
}
/** @return the end key of the key range, exclusive */
public long getEndKey() {
return endKey;
}
/** Returns a task-specific progress string */
protected abstract String progressInfo();
protected static void appendToStatus(StringBuilder sb, String desc,
long v) {
if (v == 0) {
return;
}
sb.append(", ");
sb.append(desc);
sb.append("=");
sb.append(v);
}
protected static void appendToStatus(StringBuilder sb, String desc,
String v) {
sb.append(", ");
sb.append(desc);
sb.append("=");
sb.append(v);
}
/**
* See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
* Does not verify cf/column integrity.
*/
public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
return verifyResultAgainstDataGenerator(result, verifyValues, false);
}
/**
* Verifies the result from get or scan using the dataGenerator (that was presumably
* also used to generate said result).
* @param verifyValues verify that values in the result make sense for row/cf/column combination
* @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
* that to use this multiPut should be used, or verification
* has to happen after writes, otherwise there can be races.
* @return true if the values of row result makes sense for row/cf/column combination and true if
* the cf/column set in the result is complete, false otherwise.
*/
public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
boolean verifyCfAndColumnIntegrity) {
String rowKeyStr = Bytes.toString(result.getRow());
// See if we have any data at all.
if (result.isEmpty()) {
LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
printLocations(result);
return false;
}
if (!verifyValues && !verifyCfAndColumnIntegrity) {
return true; // as long as we have something, we are good.
}
// See if we have all the CFs.
byte[][] expectedCfs = dataGenerator.getColumnFamilies();
if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
LOG.error("Error checking data for key [" + rowKeyStr
+ "], bad family count: " + result.getMap().size());
printLocations(result);
return false;
}
// Verify each column family from get in the result.
for (byte[] cf : result.getMap().keySet()) {
String cfStr = Bytes.toString(cf);
Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
if (columnValues == null) {
LOG.error("Error checking data for key [" + rowKeyStr
+ "], no data for family [" + cfStr + "]]");
printLocations(result);
return false;
}
Map<String, MutationType> mutateInfo = null;
if (verifyCfAndColumnIntegrity || verifyValues) {
if (!columnValues.containsKey(MUTATE_INFO)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
printLocations(result);
return false;
}
long cfHash = Arrays.hashCode(cf);
// Verify deleted columns, and make up column counts if deleted
byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
mutateInfo = parseMutateInfo(mutateInfoValue);
for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
if (mutate.getValue() == MutationType.DELETE) {
byte[] column = Bytes.toBytes(mutate.getKey());
long columnHash = Arrays.hashCode(column);
long hashCode = cfHash + columnHash;
if (hashCode % 2 == 0) {
if (columnValues.containsKey(column)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
printLocations(result);
return false;
}
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
columnValues.put(column, hashCodeBytes);
}
}
}
// Verify increment
if (!columnValues.containsKey(INCREMENT)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
printLocations(result);
return false;
}
long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
if (verifyValues) {
long amount = mutateInfo.isEmpty() ? 0 : cfHash;
long originalValue = Arrays.hashCode(result.getRow());
long extra = currentValue - originalValue;
if (extra != 0 && (amount == 0 || extra % amount != 0)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
printLocations(result);
return false;
}
if (amount != 0 && extra != amount) {
LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
}
}
// See if we have correct columns.
if (verifyCfAndColumnIntegrity
&& !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
String colsStr = "";
for (byte[] col : columnValues.keySet()) {
if (colsStr.length() > 0) {
colsStr += ", ";
}
colsStr += "[" + Bytes.toString(col) + "]";
}
LOG.error("Error checking data for key [" + rowKeyStr
+ "], bad columns for family [" + cfStr + "]: " + colsStr);
printLocations(result);
return false;
}
// See if values check out.
if (verifyValues) {
for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
String column = Bytes.toString(kv.getKey());
MutationType mutation = mutateInfo.get(column);
boolean verificationNeeded = true;
byte[] bytes = kv.getValue();
if (mutation != null) {
boolean mutationVerified = true;
long columnHash = Arrays.hashCode(kv.getKey());
long hashCode = cfHash + columnHash;
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
if (mutation == MutationType.APPEND) {
int offset = bytes.length - hashCodeBytes.length;
mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
if (mutationVerified) {
int n = 1;
while (true) {
int newOffset = offset - hashCodeBytes.length;
if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
break;
}
offset = newOffset;
n++;
}
if (n > 1) {
LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + column + "], appended [" + n + "] times");
}
byte[] dest = new byte[offset];
System.arraycopy(bytes, 0, dest, 0, offset);
bytes = dest;
}
} else if (hashCode % 2 == 0) { // checkAndPut
mutationVerified = Bytes.equals(bytes, hashCodeBytes);
verificationNeeded = false;
}
if (!mutationVerified) {
LOG.error("Error checking data for key [" + rowKeyStr
+ "], mutation checking failed for column family [" + cfStr + "], column ["
+ column + "]; mutation [" + mutation + "], hashCode ["
+ hashCode + "], verificationNeeded ["
+ verificationNeeded + "]");
printLocations(result);
return false;
}
} // end of mutation checking
if (verificationNeeded &&
!dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + column + "], mutation [" + mutation
+ "]; value of length " + bytes.length);
printLocations(result);
return false;
}
}
}
}
}
return true;
}
private void printLocations(Result r) {
if (r == null) {
LOG.info("FAILED FOR null Result");
return;
}
LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
if (r.getRow() == null) {
return;
}
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
List<HRegionLocation> locs = locator.getRegionLocations(r.getRow());
for (HRegionLocation h : locs) {
LOG.info("LOCATION " + h);
}
} catch (IOException e) {
LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
}
}
private String resultToString(Result result) {
StringBuilder sb = new StringBuilder();
sb.append("cells=");
if(result.isEmpty()) {
sb.append("NONE");
return sb.toString();
}
sb.append("{");
boolean moreThanOne = false;
for(Cell cell : result.listCells()) {
if(moreThanOne) {
sb.append(", ");
} else {
moreThanOne = true;
}
sb.append(CellUtil.toString(cell, true));
}
sb.append("}");
return sb.toString();
}
// Parse mutate info into a map of <column name> => <update action>
private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
Map<String, MutationType> mi = new HashMap<>();
if (mutateInfo != null) {
String mutateInfoStr = Bytes.toString(mutateInfo);
String[] mutations = mutateInfoStr.split("#");
for (String mutation: mutations) {
if (mutation.isEmpty()) continue;
Preconditions.checkArgument(mutation.contains(":"),
"Invalid mutation info " + mutation);
int p = mutation.indexOf(":");
String column = mutation.substring(0, p);
MutationType type = MutationType.valueOf(
Integer.parseInt(mutation.substring(p+1)));
mi.put(column, type);
}
}
return mi;
}
}